[bgpcfgd]: Extract classes into their own files. Run bgpcfgd as a module (#5535)

1. Rename app module to bgpcfgd
2. Extract classes from one file to the module
This commit is contained in:
pavel-shirshov 2020-10-06 08:37:49 -07:00 committed by Abhishek Dosi
parent b01dfa174c
commit 969f77cf3e
26 changed files with 778 additions and 973 deletions

View File

@ -2,7 +2,7 @@
build/
dist/
*.egg-info/
app/*.pyc
bgpcfgd/*.pyc
tests/*.pyc
tests/__pycache__/
.idea

View File

@ -1,22 +0,0 @@
import subprocess
from .log import log_debug, log_err
def run_command(command, shell=False, hide_errors=False):
"""
Run a linux command. The command is defined as a list. See subprocess.Popen documentation on format
:param command: command to execute. Type: List of strings
:param shell: execute the command through shell when True. Type: Boolean
:param hide_errors: don't report errors to syslog when True. Type: Boolean
:return: Tuple: integer exit code from the command, stdout as a string, stderr as a string
"""
log_debug("execute command '%s'." % str(command))
p = subprocess.Popen(command, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
if not hide_errors:
print_tuple = p.returncode, str(command), stdout, stderr
log_err("command execution returned %d. Command: '%s', stdout: '%s', stderr: '%s'" % print_tuple)
return p.returncode, stdout, stderr

View File

@ -1,923 +0,0 @@
#!/usr/bin/env python
import sys
import datetime
import time
import syslog
import signal
import traceback
import os
import json
from collections import defaultdict
import yaml
import jinja2
import netaddr
from swsscommon import swsscommon
from app.directory import Directory
from app.manager import Manager
from app.vars import g_debug
from app.log import log_debug, log_notice, log_info, log_warn, log_err, log_crit
from app.template import TemplateFabric
from app.config import ConfigMgr
from app.allow_list import BGPAllowListMgr
from app.util import run_command
g_run = True
class Directory(object):
""" This class stores values and notifies callbacks which were registered to be executed as soon
as some value is changed. This class works as DB cache mostly """
def __init__(self):
self.data = defaultdict(dict) # storage. A key is a slot name, a value is a dictionary with data
self.notify = defaultdict(lambda: defaultdict(list)) # registered callbacks: slot -> path -> handlers[]
@staticmethod
def get_slot_name(db, table):
""" Convert db, table pair into a slot name """
return db + "__" + table
def path_traverse(self, slot, path):
"""
Traverse a path in the storage.
If the path is an empty string, it returns a value as it is.
If the path is not an empty string, the method will traverse through the dictionary value.
Example:
self.data["key_1"] = { "abc": { "cde": { "fgh": "val_1", "ijk": "val_2" } } }
self.path_traverse("key_1", "abc/cde") will return True, { "fgh": "val_1", "ijk": "val_2" }
:param slot: storage key
:param path: storage path as a string where each internal key is separated by '/'
:return: a pair: True if the path was found, object if it was found
"""
if slot not in self.data:
return False, None
elif path == '':
return True, self.data[slot]
d = self.data[slot]
for p in path.split("/"):
if p not in d:
return False, None
d = d[p]
return True, d
def path_exist(self, db, table, path):
"""
Check if the path exists in the storage
:param db: db name
:param table: table name
:param path: requested path
:return: True if the path is available, False otherwise
"""
slot = self.get_slot_name(db, table)
return self.path_traverse(slot, path)[0]
def get_path(self, db, table, path):
"""
Return the requested path from the storage
:param db: db name
:param table: table name
:param path: requested path
:return: object if the path was found, None otherwise
"""
slot = self.get_slot_name(db, table)
return self.path_traverse(slot, path)[1]
def put(self, db, table, key, value):
"""
Put information into the storage. Notify handlers which are dependant to the information
:param db: db name
:param table: table name
:param key: key to change
:param value: value to put
:return:
"""
slot = self.get_slot_name(db, table)
self.data[slot][key] = value
if slot in self.notify:
for path in self.notify[slot].keys():
if self.path_exist(db, table, path):
for handler in self.notify[slot][path]:
handler()
def get(self, db, table, key):
"""
Get a value from the storage
:param db: db name
:param table: table name
:param key: ket to get
:return: value for the key
"""
slot = self.get_slot_name(db, table)
return self.data[slot][key]
def get_slot(self, db, table):
"""
Get an object from the storage
:param db: db name
:param table: table name
:return: object for the slot
"""
slot = self.get_slot_name(db, table)
return self.data[slot]
def remove(self, db, table, key):
"""
Remove a value from the storage
:param db: db name
:param table: table name
:param key: key to remove
"""
slot = self.get_slot_name(db, table)
if slot in self.data:
if key in self.data[slot]:
del self.data[slot][key]
else:
log_err("Directory: Can't remove key '%s' from slot '%s'. The key doesn't exist" % (key, slot))
else:
log_err("Directory: Can't remove key '%s' from slot '%s'. The slot doesn't exist" % (key, slot))
def remove_slot(self, db, table):
"""
Remove an object from the storage
:param db: db name
:param table: table name
"""
slot = self.get_slot_name(db, table)
if slot in self.data:
del self.data[slot]
else:
log_err("Directory: Can't remove slot '%s'. The slot doesn't exist" % slot)
def available(self, db, table):
"""
Check if the table is available
:param db: db name
:param table: table name
:return: True if the slot is available, False if not
"""
slot = self.get_slot_name(db, table)
return slot in self.data
def available_deps(self, deps):
"""
Check if all items from the deps list is available in the storage
:param deps: list of dependencies
:return: True if all dependencies are presented, False otherwise
"""
res = True
for db, table, path in deps:
res = res and self.path_exist(db, table, path)
return res
def subscribe(self, deps, handler):
"""
Subscribe the handler to be run as soon as all dependencies are presented
:param deps:
:param handler:
:return:
"""
for db, table, path in deps:
slot = self.get_slot_name(db, table)
self.notify[slot][path].append(handler)
class Runner(object):
""" Implements main io-loop of the application
It will run event handlers inside of Manager objects
when corresponding db/table is updated
"""
SELECT_TIMEOUT = 1000
def __init__(self):
""" Constructor """
self.db_connectors = {}
self.selector = swsscommon.Select()
self.callbacks = defaultdict(lambda: defaultdict(list)) # db -> table -> handlers[]
self.subscribers = set()
def add_manager(self, manager):
"""
Add a manager to the Runner.
As soon as new events will be receiving by Runner,
handlers of corresponding objects will be executed
:param manager: an object implementing Manager
"""
db_name = manager.get_database()
table_name = manager.get_table_name()
db = swsscommon.SonicDBConfig.getDbId(db_name)
if db not in self.db_connectors:
self.db_connectors[db] = swsscommon.DBConnector(db_name, 0)
if table_name not in self.callbacks[db]:
conn = self.db_connectors[db]
subscriber = swsscommon.SubscriberStateTable(conn, table_name)
self.subscribers.add(subscriber)
self.selector.addSelectable(subscriber)
self.callbacks[db][table_name].append(manager.handler)
def run(self):
""" Main loop """
while g_run:
state, _ = self.selector.select(Runner.SELECT_TIMEOUT)
if state == self.selector.TIMEOUT:
continue
elif state == self.selector.ERROR:
raise Exception("Received error from select")
for subscriber in self.subscribers:
key, op, fvs = subscriber.pop()
if not key:
continue
log_debug("Received message : '%s'" % str((key, op, fvs)))
for callback in self.callbacks[subscriber.getDbConnector().getDbId()][subscriber.getTableName()]:
callback(key, op, dict(fvs))
class Manager(object):
""" This class represents a SONiC DB table """
def __init__(self, common_objs, deps, database, table_name):
"""
Initialize class
:param common_objs: common object dictionary
:param deps: dependencies list
:param database: database name
:param table_name: table name
"""
self.directory = common_objs['directory']
self.cfg_mgr = common_objs['cfg_mgr']
self.constants = common_objs['constants']
self.deps = deps
self.db_name = database
self.table_name = table_name
self.set_queue = []
self.directory.subscribe(deps, self.on_deps_change) # subscribe this class method on directory changes
def get_database(self):
""" Return associated database """
return self.db_name
def get_table_name(self):
""" Return associated table name"""
return self.table_name
def handler(self, key, op, data):
"""
This method is executed on each add/remove event on the table.
:param key: key of the table entry
:param op: operation on the table entry. Could be either 'SET' or 'DEL'
:param data: associated data of the event. Empty for 'DEL' operation.
"""
if op == swsscommon.SET_COMMAND:
if self.directory.available_deps(self.deps): # all required dependencies are set in the Directory?
res = self.set_handler(key, data)
if not res: # set handler returned False, which means it is not ready to process is. Save it for later.
log_debug("'SET' handler returned NOT_READY for the Manager: %s" % self.__class__)
self.set_queue.append((key, data))
else:
log_debug("Not all dependencies are met for the Manager: %s" % self.__class__)
self.set_queue.append((key, data))
elif op == swsscommon.DEL_COMMAND:
self.del_handler(key)
else:
log_err("Invalid operation '%s' for key '%s'" % (op, key))
def on_deps_change(self):
""" This method is being executed on every dependency change """
if not self.directory.available_deps(self.deps):
return
new_queue = []
for key, data in self.set_queue:
res = self.set_handler(key, data)
if not res:
new_queue.append((key, data))
self.set_queue = new_queue
def set_handler(self, key, data):
""" Placeholder for 'SET' command """
log_err("set_handler() wasn't implemented for %s" % self.__class__.__name__)
def del_handler(self, key):
""" Placeholder for 'DEL' command """
log_err("del_handler wasn't implemented for %s" % self.__class__.__name__)
class BGPDataBaseMgr(Manager):
""" This class updates the Directory object when db table is updated """
def __init__(self, common_objs, db, table):
"""
Initialize the object
:param common_objs: common object dictionary
:param db: name of the db
:param table: name of the table in the db
"""
super(BGPDataBaseMgr, self).__init__(
common_objs,
[],
db,
table,
)
def set_handler(self, key, data):
""" Implementation of 'SET' command for this class """
self.directory.put(self.db_name, self.table_name, key, data)
return True
def del_handler(self, key):
""" Implementation of 'DEL' command for this class """
self.directory.remove(self.db_name, self.table_name, key)
class InterfaceMgr(Manager):
""" This class updates the Directory object when interface-related table is updated """
def __init__(self, common_objs, db, table):
"""
Initialize the object
:param common_objs: common object dictionary
:param db: name of the db
:param table: name of the table in the db
"""
super(InterfaceMgr, self).__init__(
common_objs,
[],
db,
table,
)
def set_handler(self, key, data):
""" Implementation of 'SET' command.
Similar to BGPDataBaseMgr but enriches data object with additional data """
# Interface table can have two keys,
# one with ip prefix and one without ip prefix
if '|' in key:
interface_name, network_str = key.split('|', 1)
try:
network = netaddr.IPNetwork(str(network_str))
except (netaddr.NotRegisteredError, netaddr.AddrFormatError, netaddr.AddrConversionError):
log_warn("Subnet '%s' format is wrong for interface '%s'" % (network_str, data["interface"]))
return True
data["interface"] = interface_name
data["prefixlen"] = str(network.prefixlen)
ip = str(network.ip)
self.directory.put("LOCAL", "local_addresses", ip, data)
self.directory.put(self.db_name, self.table_name, key, data)
self.directory.put("LOCAL", "interfaces", key, data)
return True
def del_handler(self, key):
""" Implementation of 'DEL' command
Also removes data object enrichment """
if '|' in key:
interface, network = key.split('|', 1)
try:
network = netaddr.IPNetwork(str(network))
except (netaddr.NotRegisteredError, netaddr.AddrFormatError, netaddr.AddrConversionError):
log_warn("Subnet '%s' format is wrong for interface '%s'" % (network, interface))
return
ip = str(network.ip)
self.directory.remove("LOCAL", "local_addresses", ip)
self.directory.remove(self.db_name, self.table_name, key)
self.directory.remove("LOCAL", "interfaces", key)
class BGPPeerGroupMgr(object):
""" This class represents peer-group and routing policy for the peer_type """
def __init__(self, common_objs, base_template):
"""
Construct the object
:param common_objs: common objects
:param base_template: path to the directory with Jinja2 templates
"""
self.cfg_mgr = common_objs['cfg_mgr']
self.constants = common_objs['constants']
tf = common_objs['tf']
self.policy_template = tf.from_file(base_template + "policies.conf.j2")
self.peergroup_template = tf.from_file(base_template + "peer-group.conf.j2")
def update(self, name, **kwargs):
"""
Update peer-group and routing policy for the peer with the name
:param name: name of the peer. Used for logging only
:param kwargs: dictionary with parameters for rendering
"""
rc_policy = self.update_policy(name, **kwargs)
rc_pg = self.update_pg(name, **kwargs)
return rc_policy and rc_pg
def update_policy(self, name, **kwargs):
"""
Update routing policy for the peer
:param name: name of the peer. Used for logging only
:param kwargs: dictionary with parameters for rendering
"""
try:
policy = self.policy_template.render(**kwargs)
except jinja2.TemplateError as e:
log_err("Can't render policy template name: '%s': %s" % (name, str(e)))
return False
return self.update_entity(policy, "Routing policy for peer '%s'" % name)
def update_pg(self, name, **kwargs):
"""
Update peer-group for the peer
:param name: name of the peer. Used for logging only
:param kwargs: dictionary with parameters for rendering
"""
try:
pg = self.peergroup_template.render(**kwargs)
except jinja2.TemplateError as e:
log_err("Can't render peer-group template: '%s': %s" % (name, str(e)))
return False
if kwargs['vrf'] == 'default':
cmd = ('router bgp %s\n' % kwargs['bgp_asn']) + pg
else:
cmd = ('router bgp %s vrf %s\n' % (kwargs['bgp_asn'], kwargs['vrf'])) + pg
return self.update_entity(cmd, "Peer-group for peer '%s'" % name)
def update_entity(self, cmd, txt):
"""
Send commands to FRR
:param cmd: commands to send in a raw form
:param txt: text for the syslog output
:return:
"""
ret_code = self.cfg_mgr.push(cmd)
if ret_code:
log_info("%s was updated" % txt)
else:
log_err("Can't update %s" % txt)
return ret_code
class BGPPeerMgrBase(Manager):
""" Manager of BGP peers """
def __init__(self, common_objs, db_name, table_name, peer_type, check_neig_meta):
"""
Initialize the object
:param common_objs: common objects
:param table_name: name of the table with peers
:param peer_type: type of the peers. It is used to find right templates
"""
self.common_objs = common_objs
self.constants = self.common_objs["constants"]
self.fabric = common_objs['tf']
self.peer_type = peer_type
base_template = "bgpd/templates/" + self.constants["bgp"]["peers"][peer_type]["template_dir"] + "/"
self.templates = {
"add": self.fabric.from_file(base_template + "instance.conf.j2"),
"delete": self.fabric.from_string('no neighbor {{ neighbor_addr }}'),
"shutdown": self.fabric.from_string('neighbor {{ neighbor_addr }} shutdown'),
"no shutdown": self.fabric.from_string('no neighbor {{ neighbor_addr }} shutdown'),
}
deps = [
("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME, "localhost/bgp_asn"),
("CONFIG_DB", swsscommon.CFG_LOOPBACK_INTERFACE_TABLE_NAME, "Loopback0"),
("LOCAL", "local_addresses", ""),
("LOCAL", "interfaces", ""),
]
if check_neig_meta:
self.check_neig_meta = 'bgp' in self.constants \
and 'use_neighbors_meta' in self.constants['bgp'] \
and self.constants['bgp']['use_neighbors_meta']
else:
self.check_neig_meta = False
self.check_deployment_id = 'bgp' in self.constants \
and 'use_deployment_id' in self.constants['bgp'] \
and self.constants['bgp']['use_deployment_id']
if self.check_neig_meta:
deps.append(("CONFIG_DB", swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME, ""))
if self.check_deployment_id:
deps.append(("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME, "localhost/deployment_id"))
super(BGPPeerMgrBase, self).__init__(
common_objs,
deps,
db_name,
table_name,
)
self.peers = self.load_peers()
self.peer_group_mgr = BGPPeerGroupMgr(self.common_objs, base_template)
return
def set_handler(self, key, data):
"""
It runs on 'SET' command
:param key: key of the changed table
:param data: the data associated with the change
"""
vrf, nbr = self.split_key(key)
peer_key = (vrf, nbr)
if peer_key not in self.peers:
return self.add_peer(vrf, nbr, data)
else:
return self.update_peer(vrf, nbr, data)
def add_peer(self, vrf, nbr, data):
"""
Add a peer into FRR. This is used if the peer is not existed in FRR yet
:param vrf: vrf name. Name is equal "default" for the global vrf
:param nbr: neighbor ip address (name for dynamic peer type)
:param data: associated data
:return: True if this adding was successful, False otherwise
"""
print_data = vrf, nbr, data
bgp_asn = self.directory.get_slot("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME)["localhost"]["bgp_asn"]
#
lo0_ipv4 = self.get_lo0_ipv4()
if lo0_ipv4 is None:
log_warn("Loopback0 ipv4 address is not presented yet")
return False
#
if "local_addr" not in data:
log_warn("Peer %s. Missing attribute 'local_addr'" % nbr)
else:
# The bgp session that belongs to a vnet cannot be advertised as the default BGP session.
# So we need to check whether this bgp session belongs to a vnet.
data["local_addr"] = str(netaddr.IPNetwork(str(data["local_addr"])).ip)
interface = self.get_local_interface(data["local_addr"])
if not interface:
print_data = nbr, data["local_addr"]
log_debug("Peer '%s' with local address '%s' wait for the corresponding interface to be set" % print_data)
return False
vnet = self.get_vnet(interface)
if vnet:
# Ignore the bgp session that is in a vnet
log_info("Ignore the BGP peer '%s' as the interface '%s' is in vnet '%s'" % (nbr, interface, vnet))
return True
kwargs = {
'CONFIG_DB__DEVICE_METADATA': self.directory.get_slot("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME),
'constants': self.constants,
'bgp_asn': bgp_asn,
'vrf': vrf,
'neighbor_addr': nbr,
'bgp_session': data,
'loopback0_ipv4': lo0_ipv4,
}
if self.check_neig_meta:
neigmeta = self.directory.get_slot("CONFIG_DB", swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME)
if 'name' in data and data["name"] not in neigmeta:
log_info("DEVICE_NEIGHBOR_METADATA is not ready for neighbor '%s' - '%s'" % (nbr, data['name']))
return False
kwargs['CONFIG_DB__DEVICE_NEIGHBOR_METADATA'] = neigmeta
tag = data['name'] if 'name' in data else nbr
self.peer_group_mgr.update(tag, **kwargs)
try:
cmd = self.templates["add"].render(**kwargs)
except jinja2.TemplateError as e:
msg = "Peer '(%s|%s)'. Error in rendering the template for 'SET' command '%s'" % print_data
log_err("%s: %s" % (msg, str(e)))
return True
if cmd is not None:
ret_code = self.apply_op(cmd, vrf)
key = (vrf, nbr)
if ret_code:
self.peers.add(key)
log_info("Peer '(%s|%s)' added with attributes '%s'" % print_data)
else:
log_err("Peer '(%s|%s)' wasn't added." % (vrf, nbr))
return True
def update_peer(self, vrf, nbr, data):
"""
Update a peer. This is used when the peer is already in the FRR
Update support only "admin_status" for now
:param vrf: vrf name. Name is equal "default" for the global vrf
:param nbr: neighbor ip address (name for dynamic peer type)
:param data: associated data
:return: True if this adding was successful, False otherwise
"""
if "admin_status" in data:
self.change_admin_status(vrf, nbr, data)
else:
log_err("Peer '(%s|%s)': Can't update the peer. Only 'admin_status' attribute is supported" % (vrf, nbr))
return True
def change_admin_status(self, vrf, nbr, data):
"""
Change admin status of a peer
:param vrf: vrf name. Name is equal "default" for the global vrf
:param nbr: neighbor ip address (name for dynamic peer type)
:param data: associated data
:return: True if this adding was successful, False otherwise
"""
if data['admin_status'] == 'up':
self.apply_admin_status(vrf, nbr, "no shutdown", "up")
elif data['admin_status'] == 'down':
self.apply_admin_status(vrf, nbr, "shutdown", "down")
else:
print_data = vrf, nbr, data['admin_status']
log_err("Peer '%s|%s': Can't update the peer. It has wrong attribute value attr['admin_status'] = '%s'" % print_data)
def apply_admin_status(self, vrf, nbr, template_name, admin_state):
"""
Render admin state template and apply the command to the FRR
:param vrf: vrf name. Name is equal "default" for the global vrf
:param nbr: neighbor ip address (name for dynamic peer type)
:param template_name: name of the template to render
:param admin_state: desired admin state
:return: True if this adding was successful, False otherwise
"""
print_data = vrf, nbr, admin_state
ret_code = self.apply_op(self.templates[template_name].render(neighbor_addr=nbr), vrf)
if ret_code:
log_info("Peer '%s|%s' admin state is set to '%s'" % print_data)
else:
log_err("Can't set peer '%s|%s' admin state to '%s'." % print_data)
def del_handler(self, key):
"""
'DEL' handler for the BGP PEER tables
:param key: key of the neighbor
"""
vrf, nbr = self.split_key(key)
peer_key = (vrf, nbr)
if peer_key not in self.peers:
log_warn("Peer '(%s|%s)' has not been found" % (vrf, nbr))
return
cmd = self.templates["delete"].render(neighbor_addr=nbr)
ret_code = self.apply_op(cmd, vrf)
if ret_code:
log_info("Peer '(%s|%s)' has been removed" % (vrf, nbr))
self.peers.remove(peer_key)
else:
log_err("Peer '(%s|%s)' hasn't been removed" % (vrf, nbr))
def apply_op(self, cmd, vrf):
"""
Push commands cmd into FRR
:param cmd: commands in raw format
:param vrf: vrf where the commands should be applied
:return: True if no errors, False if there are errors
"""
bgp_asn = self.directory.get_slot("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME)["localhost"]["bgp_asn"]
if vrf == 'default':
cmd = ('router bgp %s\n' % bgp_asn) + cmd
else:
cmd = ('router bgp %s vrf %s\n' % (bgp_asn, vrf)) + cmd
return self.cfg_mgr.push(cmd)
def get_lo0_ipv4(self):
"""
Extract Loopback0 ipv4 address from the Directory
:return: ipv4 address for Loopback0, None if nothing found
"""
loopback0_ipv4 = None
for loopback in self.directory.get_slot("CONFIG_DB", swsscommon.CFG_LOOPBACK_INTERFACE_TABLE_NAME).iterkeys():
if loopback.startswith("Loopback0|"):
loopback0_prefix_str = loopback.replace("Loopback0|", "")
loopback0_ip_str = loopback0_prefix_str[:loopback0_prefix_str.find('/')]
if TemplateFabric.is_ipv4(loopback0_ip_str):
loopback0_ipv4 = loopback0_ip_str
break
return loopback0_ipv4
def get_local_interface(self, local_addr):
"""
Get interface according to the local address from the directory
:param: directory: Directory object that stored metadata of interfaces
:param: local_addr: Local address of the interface
:return: Return the metadata of the interface with the local address
If the interface has not been set, return None
"""
local_addresses = self.directory.get_slot("LOCAL", "local_addresses")
# Check if the local address of this bgp session has been set
if local_addr not in local_addresses:
return None
local_address = local_addresses[local_addr]
interfaces = self.directory.get_slot("LOCAL", "interfaces")
# Check if the information for the interface of this local address has been set
if local_address.has_key("interface") and local_address["interface"] in interfaces:
return interfaces[local_address["interface"]]
else:
return None
@staticmethod
def get_vnet(interface):
"""
Get the VNet name of the interface
:param: interface: The metadata of the interface
:return: Return the vnet name of the interface if this interface belongs to a vnet,
Otherwise return None
"""
if interface.has_key("vnet_name") and interface["vnet_name"]:
return interface["vnet_name"]
else:
return None
@staticmethod
def split_key(key):
"""
Split key into ip address and vrf name. If there is no vrf, "default" would be return for vrf
:param key: key to split
:return: vrf name extracted from the key, peer ip address extracted from the key
"""
if '|' not in key:
return 'default', key
else:
return tuple(key.split('|', 1))
@staticmethod
def load_peers():
"""
Load peers from FRR.
:return: set of peers, which are already installed in FRR
"""
command = ["vtysh", "-c", "show bgp vrfs json"]
ret_code, out, err = run_command(command)
if ret_code == 0:
js_vrf = json.loads(out)
vrfs = js_vrf['vrfs'].keys()
else:
log_crit("Can't read bgp vrfs: %s" % err)
raise Exception("Can't read bgp vrfs: %s" % err)
peers = set()
for vrf in vrfs:
command = ["vtysh", "-c", 'show bgp vrf %s neighbors json' % str(vrf)]
ret_code, out, err = run_command(command)
if ret_code == 0:
js_bgp = json.loads(out)
for nbr in js_bgp.keys():
peers.add((vrf, nbr))
else:
log_crit("Can't read vrf '%s' neighbors: %s" % (vrf, str(err)))
raise Exception("Can't read vrf '%s' neighbors: %s" % (vrf, str(err)))
return peers
class ZebraSetSrc(Manager):
""" This class initialize "set src" settings for zebra """
def __init__(self, common_objs, db, table):
"""
Initialize the object
:param common_objs: common object dictionary
:param db: name of the db
:param table: name of the table in the db
"""
super(ZebraSetSrc, self).__init__(
common_objs,
[],
db,
table,
)
tf = common_objs['tf']
self.zebra_set_src_template = tf.from_file("zebra/zebra.set_src.conf.j2")
self.lo_ipv4 = None
self.lo_ipv6 = None
def set_handler(self, key, data):
""" Implementation of 'SET' command for this class """
self.directory.put(self.db_name, self.table_name, key, data)
#
if key.startswith("Loopback0|") and "state" in data and data["state"] == "ok":
ip_addr_w_mask = key.replace("Loopback0|", "")
slash_pos = ip_addr_w_mask.rfind("/")
if slash_pos == -1:
log_err("Wrong Loopback0 ip prefix: '%s'" % ip_addr_w_mask)
return True
ip_addr = ip_addr_w_mask[:slash_pos]
try:
if TemplateFabric.is_ipv4(ip_addr):
if self.lo_ipv4 is None:
self.lo_ipv4 = ip_addr
txt = self.zebra_set_src_template.render(rm_name="RM_SET_SRC", lo_ip=ip_addr, ip_proto="")
else:
log_warn("Update command is not supported for set src templates. current ip='%s'. new ip='%s'" % (self.lo_ipv4, ip_addr))
return True
elif TemplateFabric.is_ipv6(ip_addr):
if self.lo_ipv6 is None:
self.lo_ipv6 = ip_addr
txt = self.zebra_set_src_template.render(rm_name="RM_SET_SRC6", lo_ip=ip_addr, ip_proto="v6")
else:
log_warn("Update command is not supported for set src templates. current ip='%s'. new ip='%s'" % (self.lo_ipv6, ip_addr))
return True
else:
log_err("Got ambiguous ip address '%s'" % ip_addr)
return True
except jinja2.TemplateError as e:
log_err("Error while rendering 'set src' template: %s" % str(e))
return True
if self.cfg_mgr.push(txt):
log_info("The 'set src' configuration with Loopback0 ip '%s' was pushed" % ip_addr)
else:
log_err("The 'set src' configuration with Loopback0 ip '%s' wasn't pushed" % ip_addr)
return True
def del_handler(self, key):
""" Implementation of 'DEL' command for this class """
self.directory.remove(self.db_name, self.table_name, key)
log_warn("Delete command is not supported for 'zebra set src' templates")
def wait_for_daemons(daemons, seconds):
"""
Wait until FRR daemons are ready for requests
:param daemons: list of FRR daemons to wait
:param seconds: number of seconds to wait, until raise an error
"""
stop_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds)
log_info("Start waiting for FRR daemons: %s" % str(datetime.datetime.now()))
while datetime.datetime.now() < stop_time:
ret_code, out, err = run_command(["vtysh", "-c", "show daemons"], hide_errors=True)
if ret_code == 0 and all(daemon in out for daemon in daemons):
log_info("All required daemons have connected to vtysh: %s" % str(datetime.datetime.now()))
return
else:
log_warn("Can't read daemon status from FRR: %s" % str(err))
time.sleep(0.1) # sleep 100 ms
raise RuntimeError("FRR daemons hasn't been started in %d seconds" % seconds)
def read_constants():
""" Read file with constants values from /etc/sonic/constants.yml """
with open('/etc/sonic/constants.yml') as fp:
content = yaml.load(fp) # FIXME: , Loader=yaml.FullLoader)
if "constants" not in content:
log_crit("/etc/sonic/constants.yml doesn't have 'constants' key")
raise Exception("/etc/sonic/constants.yml doesn't have 'constants' key")
return content["constants"]
def main():
""" Main function """
wait_for_daemons(["bgpd", "zebra", "staticd"], seconds=20)
#
common_objs = {
'directory': Directory(),
'cfg_mgr': ConfigMgr(),
'tf': TemplateFabric(),
'constants': read_constants(),
}
managers = [
# Config DB managers
BGPDataBaseMgr(common_objs, "CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME),
BGPDataBaseMgr(common_objs, "CONFIG_DB", swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME),
# Interface managers
InterfaceMgr(common_objs, "CONFIG_DB", swsscommon.CFG_INTF_TABLE_NAME),
InterfaceMgr(common_objs, "CONFIG_DB", swsscommon.CFG_LOOPBACK_INTERFACE_TABLE_NAME),
InterfaceMgr(common_objs, "CONFIG_DB", swsscommon.CFG_VLAN_INTF_TABLE_NAME),
InterfaceMgr(common_objs, "CONFIG_DB", swsscommon.CFG_LAG_INTF_TABLE_NAME),
# State DB managers
ZebraSetSrc(common_objs, "STATE_DB", swsscommon.STATE_INTERFACE_TABLE_NAME),
# Peer Managers
BGPPeerMgrBase(common_objs, "CONFIG_DB", swsscommon.CFG_BGP_NEIGHBOR_TABLE_NAME, "general", True),
BGPPeerMgrBase(common_objs, "CONFIG_DB", "BGP_MONITORS", "monitors", False),
BGPPeerMgrBase(common_objs, "CONFIG_DB", "BGP_PEER_RANGE", "dynamic", False),
# AllowList Managers
BGPAllowListMgr(common_objs, "CONFIG_DB", "BGP_ALLOWED_PREFIXES"),
]
runner = Runner()
for mgr in managers:
runner.add_manager(mgr)
runner.run()
def signal_handler(_, __): # signal_handler(signum, frame)
""" signal handler """
global g_run
g_run = False
if __name__ == '__main__':
rc = 0
try:
syslog.openlog('bgpcfgd')
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
main()
except KeyboardInterrupt:
log_notice("Keyboard interrupt")
except RuntimeError as exc:
log_crit(str(exc))
rc = -2
if g_debug:
raise
except Exception as exc:
log_crit("Got an exception %s: Traceback: %s" % (str(exc), traceback.format_exc()))
rc = -1
if g_debug:
raise
finally:
syslog.closelog()
try:
sys.exit(rc)
except SystemExit:
os._exit(rc)

View File

@ -0,0 +1,4 @@
from .main import main
if __name__ == '__main__':
main()

View File

@ -3,7 +3,7 @@ import tempfile
from .vars import g_debug
from .log import log_crit, log_err
from .util import run_command
from .utils import run_command
class ConfigMgr(object):

View File

@ -1,6 +1,6 @@
from collections import defaultdict
from app.log import log_err
from .log import log_err
class Directory(object):

View File

@ -0,0 +1,81 @@
import os
import signal
import sys
import syslog
import traceback
from swsscommon import swsscommon
from .config import ConfigMgr
from .directory import Directory
from .log import log_notice, log_crit
from .managers_allow_list import BGPAllowListMgr
from .managers_bgp import BGPPeerMgrBase
from .managers_db import BGPDataBaseMgr
from .managers_intf import InterfaceMgr
from .managers_setsrc import ZebraSetSrc
from .runner import Runner, signal_handler
from .template import TemplateFabric
from .utils import wait_for_daemons, read_constants
from .vars import g_debug
def do_work():
""" Main function """
wait_for_daemons(["bgpd", "zebra", "staticd"], seconds=20)
#
common_objs = {
'directory': Directory(),
'cfg_mgr': ConfigMgr(),
'tf': TemplateFabric(),
'constants': read_constants(),
}
managers = [
# Config DB managers
BGPDataBaseMgr(common_objs, "CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME),
BGPDataBaseMgr(common_objs, "CONFIG_DB", swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME),
# Interface managers
InterfaceMgr(common_objs, "CONFIG_DB", swsscommon.CFG_INTF_TABLE_NAME),
InterfaceMgr(common_objs, "CONFIG_DB", swsscommon.CFG_LOOPBACK_INTERFACE_TABLE_NAME),
InterfaceMgr(common_objs, "CONFIG_DB", swsscommon.CFG_VLAN_INTF_TABLE_NAME),
InterfaceMgr(common_objs, "CONFIG_DB", swsscommon.CFG_LAG_INTF_TABLE_NAME),
# State DB managers
ZebraSetSrc(common_objs, "STATE_DB", swsscommon.STATE_INTERFACE_TABLE_NAME),
# Peer Managers
BGPPeerMgrBase(common_objs, "CONFIG_DB", swsscommon.CFG_BGP_NEIGHBOR_TABLE_NAME, "general", True),
BGPPeerMgrBase(common_objs, "CONFIG_DB", "BGP_MONITORS", "monitors", False),
BGPPeerMgrBase(common_objs, "CONFIG_DB", "BGP_PEER_RANGE", "dynamic", False),
# AllowList Managers
BGPAllowListMgr(common_objs, "CONFIG_DB", "BGP_ALLOWED_PREFIXES"),
]
runner = Runner()
for mgr in managers:
runner.add_manager(mgr)
runner.run()
def main():
rc = 0
try:
syslog.openlog('bgpcfgd')
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
do_work()
except KeyboardInterrupt:
log_notice("Keyboard interrupt")
except RuntimeError as exc:
log_crit(str(exc))
rc = -2
if g_debug:
raise
except Exception as exc:
log_crit("Got an exception %s: Traceback: %s" % (str(exc), traceback.format_exc()))
rc = -1
if g_debug:
raise
finally:
syslog.closelog()
try:
sys.exit(rc)
except SystemExit:
os._exit(rc)

View File

@ -1,6 +1,6 @@
from swsscommon import swsscommon
from app.log import log_debug, log_err
from .log import log_debug, log_err
class Manager(object):

View File

@ -3,10 +3,10 @@ Implementation of "allow-list" feature
"""
import re
from app.log import log_debug, log_info, log_err, log_warn
from app.template import TemplateFabric
from app.manager import Manager
from app.util import run_command
from .log import log_debug, log_info, log_err, log_warn
from .template import TemplateFabric
from .manager import Manager
from .utils import run_command
class BGPAllowListMgr(Manager):
""" This class initialize "AllowList" settings """

View File

@ -0,0 +1,391 @@
import json
from swsscommon import swsscommon
import jinja2
import netaddr
from .log import log_warn, log_err, log_info, log_debug, log_crit
from .manager import Manager
from .template import TemplateFabric
from .utils import run_command
class BGPPeerGroupMgr(object):
""" This class represents peer-group and routing policy for the peer_type """
def __init__(self, common_objs, base_template):
"""
Construct the object
:param common_objs: common objects
:param base_template: path to the directory with Jinja2 templates
"""
self.cfg_mgr = common_objs['cfg_mgr']
self.constants = common_objs['constants']
tf = common_objs['tf']
self.policy_template = tf.from_file(base_template + "policies.conf.j2")
self.peergroup_template = tf.from_file(base_template + "peer-group.conf.j2")
def update(self, name, **kwargs):
"""
Update peer-group and routing policy for the peer with the name
:param name: name of the peer. Used for logging only
:param kwargs: dictionary with parameters for rendering
"""
rc_policy = self.update_policy(name, **kwargs)
rc_pg = self.update_pg(name, **kwargs)
return rc_policy and rc_pg
def update_policy(self, name, **kwargs):
"""
Update routing policy for the peer
:param name: name of the peer. Used for logging only
:param kwargs: dictionary with parameters for rendering
"""
try:
policy = self.policy_template.render(**kwargs)
except jinja2.TemplateError as e:
log_err("Can't render policy template name: '%s': %s" % (name, str(e)))
return False
return self.update_entity(policy, "Routing policy for peer '%s'" % name)
def update_pg(self, name, **kwargs):
"""
Update peer-group for the peer
:param name: name of the peer. Used for logging only
:param kwargs: dictionary with parameters for rendering
"""
try:
pg = self.peergroup_template.render(**kwargs)
except jinja2.TemplateError as e:
log_err("Can't render peer-group template: '%s': %s" % (name, str(e)))
return False
if kwargs['vrf'] == 'default':
cmd = ('router bgp %s\n' % kwargs['bgp_asn']) + pg
else:
cmd = ('router bgp %s vrf %s\n' % (kwargs['bgp_asn'], kwargs['vrf'])) + pg
return self.update_entity(cmd, "Peer-group for peer '%s'" % name)
def update_entity(self, cmd, txt):
"""
Send commands to FRR
:param cmd: commands to send in a raw form
:param txt: text for the syslog output
:return:
"""
ret_code = self.cfg_mgr.push(cmd)
if ret_code:
log_info("%s was updated" % txt)
else:
log_err("Can't update %s" % txt)
return ret_code
class BGPPeerMgrBase(Manager):
""" Manager of BGP peers """
def __init__(self, common_objs, db_name, table_name, peer_type, check_neig_meta):
"""
Initialize the object
:param common_objs: common objects
:param table_name: name of the table with peers
:param peer_type: type of the peers. It is used to find right templates
"""
self.common_objs = common_objs
self.constants = self.common_objs["constants"]
self.fabric = common_objs['tf']
self.peer_type = peer_type
base_template = "bgpd/templates/" + self.constants["bgp"]["peers"][peer_type]["template_dir"] + "/"
self.templates = {
"add": self.fabric.from_file(base_template + "instance.conf.j2"),
"delete": self.fabric.from_string('no neighbor {{ neighbor_addr }}'),
"shutdown": self.fabric.from_string('neighbor {{ neighbor_addr }} shutdown'),
"no shutdown": self.fabric.from_string('no neighbor {{ neighbor_addr }} shutdown'),
}
deps = [
("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME, "localhost/bgp_asn"),
("CONFIG_DB", swsscommon.CFG_LOOPBACK_INTERFACE_TABLE_NAME, "Loopback0"),
("LOCAL", "local_addresses", ""),
("LOCAL", "interfaces", ""),
]
if check_neig_meta:
self.check_neig_meta = 'bgp' in self.constants \
and 'use_neighbors_meta' in self.constants['bgp'] \
and self.constants['bgp']['use_neighbors_meta']
else:
self.check_neig_meta = False
self.check_deployment_id = 'bgp' in self.constants \
and 'use_deployment_id' in self.constants['bgp'] \
and self.constants['bgp']['use_deployment_id']
if self.check_neig_meta:
deps.append(("CONFIG_DB", swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME, ""))
if self.check_deployment_id:
deps.append(("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME, "localhost/deployment_id"))
super(BGPPeerMgrBase, self).__init__(
common_objs,
deps,
db_name,
table_name,
)
self.peers = self.load_peers()
self.peer_group_mgr = BGPPeerGroupMgr(self.common_objs, base_template)
return
def set_handler(self, key, data):
"""
It runs on 'SET' command
:param key: key of the changed table
:param data: the data associated with the change
"""
vrf, nbr = self.split_key(key)
peer_key = (vrf, nbr)
if peer_key not in self.peers:
return self.add_peer(vrf, nbr, data)
else:
return self.update_peer(vrf, nbr, data)
def add_peer(self, vrf, nbr, data):
"""
Add a peer into FRR. This is used if the peer is not existed in FRR yet
:param vrf: vrf name. Name is equal "default" for the global vrf
:param nbr: neighbor ip address (name for dynamic peer type)
:param data: associated data
:return: True if this adding was successful, False otherwise
"""
print_data = vrf, nbr, data
bgp_asn = self.directory.get_slot("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME)["localhost"]["bgp_asn"]
#
lo0_ipv4 = self.get_lo0_ipv4()
if lo0_ipv4 is None:
log_warn("Loopback0 ipv4 address is not presented yet")
return False
#
if "local_addr" not in data:
log_warn("Peer %s. Missing attribute 'local_addr'" % nbr)
else:
# The bgp session that belongs to a vnet cannot be advertised as the default BGP session.
# So we need to check whether this bgp session belongs to a vnet.
data["local_addr"] = str(netaddr.IPNetwork(str(data["local_addr"])).ip)
interface = self.get_local_interface(data["local_addr"])
if not interface:
print_data = nbr, data["local_addr"]
log_debug("Peer '%s' with local address '%s' wait for the corresponding interface to be set" % print_data)
return False
vnet = self.get_vnet(interface)
if vnet:
# Ignore the bgp session that is in a vnet
log_info("Ignore the BGP peer '%s' as the interface '%s' is in vnet '%s'" % (nbr, interface, vnet))
return True
kwargs = {
'CONFIG_DB__DEVICE_METADATA': self.directory.get_slot("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME),
'constants': self.constants,
'bgp_asn': bgp_asn,
'vrf': vrf,
'neighbor_addr': nbr,
'bgp_session': data,
'loopback0_ipv4': lo0_ipv4,
}
if self.check_neig_meta:
neigmeta = self.directory.get_slot("CONFIG_DB", swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME)
if 'name' in data and data["name"] not in neigmeta:
log_info("DEVICE_NEIGHBOR_METADATA is not ready for neighbor '%s' - '%s'" % (nbr, data['name']))
return False
kwargs['CONFIG_DB__DEVICE_NEIGHBOR_METADATA'] = neigmeta
tag = data['name'] if 'name' in data else nbr
self.peer_group_mgr.update(tag, **kwargs)
try:
cmd = self.templates["add"].render(**kwargs)
except jinja2.TemplateError as e:
msg = "Peer '(%s|%s)'. Error in rendering the template for 'SET' command '%s'" % print_data
log_err("%s: %s" % (msg, str(e)))
return True
if cmd is not None:
ret_code = self.apply_op(cmd, vrf)
key = (vrf, nbr)
if ret_code:
self.peers.add(key)
log_info("Peer '(%s|%s)' added with attributes '%s'" % print_data)
else:
log_err("Peer '(%s|%s)' wasn't added." % (vrf, nbr))
return True
def update_peer(self, vrf, nbr, data):
"""
Update a peer. This is used when the peer is already in the FRR
Update support only "admin_status" for now
:param vrf: vrf name. Name is equal "default" for the global vrf
:param nbr: neighbor ip address (name for dynamic peer type)
:param data: associated data
:return: True if this adding was successful, False otherwise
"""
if "admin_status" in data:
self.change_admin_status(vrf, nbr, data)
else:
log_err("Peer '(%s|%s)': Can't update the peer. Only 'admin_status' attribute is supported" % (vrf, nbr))
return True
def change_admin_status(self, vrf, nbr, data):
"""
Change admin status of a peer
:param vrf: vrf name. Name is equal "default" for the global vrf
:param nbr: neighbor ip address (name for dynamic peer type)
:param data: associated data
:return: True if this adding was successful, False otherwise
"""
if data['admin_status'] == 'up':
self.apply_admin_status(vrf, nbr, "no shutdown", "up")
elif data['admin_status'] == 'down':
self.apply_admin_status(vrf, nbr, "shutdown", "down")
else:
print_data = vrf, nbr, data['admin_status']
log_err("Peer '%s|%s': Can't update the peer. It has wrong attribute value attr['admin_status'] = '%s'" % print_data)
def apply_admin_status(self, vrf, nbr, template_name, admin_state):
"""
Render admin state template and apply the command to the FRR
:param vrf: vrf name. Name is equal "default" for the global vrf
:param nbr: neighbor ip address (name for dynamic peer type)
:param template_name: name of the template to render
:param admin_state: desired admin state
:return: True if this adding was successful, False otherwise
"""
print_data = vrf, nbr, admin_state
ret_code = self.apply_op(self.templates[template_name].render(neighbor_addr=nbr), vrf)
if ret_code:
log_info("Peer '%s|%s' admin state is set to '%s'" % print_data)
else:
log_err("Can't set peer '%s|%s' admin state to '%s'." % print_data)
def del_handler(self, key):
"""
'DEL' handler for the BGP PEER tables
:param key: key of the neighbor
"""
vrf, nbr = self.split_key(key)
peer_key = (vrf, nbr)
if peer_key not in self.peers:
log_warn("Peer '(%s|%s)' has not been found" % (vrf, nbr))
return
cmd = self.templates["delete"].render(neighbor_addr=nbr)
ret_code = self.apply_op(cmd, vrf)
if ret_code:
log_info("Peer '(%s|%s)' has been removed" % (vrf, nbr))
self.peers.remove(peer_key)
else:
log_err("Peer '(%s|%s)' hasn't been removed" % (vrf, nbr))
def apply_op(self, cmd, vrf):
"""
Push commands cmd into FRR
:param cmd: commands in raw format
:param vrf: vrf where the commands should be applied
:return: True if no errors, False if there are errors
"""
bgp_asn = self.directory.get_slot("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME)["localhost"]["bgp_asn"]
if vrf == 'default':
cmd = ('router bgp %s\n' % bgp_asn) + cmd
else:
cmd = ('router bgp %s vrf %s\n' % (bgp_asn, vrf)) + cmd
return self.cfg_mgr.push(cmd)
def get_lo0_ipv4(self):
"""
Extract Loopback0 ipv4 address from the Directory
:return: ipv4 address for Loopback0, None if nothing found
"""
loopback0_ipv4 = None
for loopback in self.directory.get_slot("CONFIG_DB", swsscommon.CFG_LOOPBACK_INTERFACE_TABLE_NAME).iterkeys():
if loopback.startswith("Loopback0|"):
loopback0_prefix_str = loopback.replace("Loopback0|", "")
loopback0_ip_str = loopback0_prefix_str[:loopback0_prefix_str.find('/')]
if TemplateFabric.is_ipv4(loopback0_ip_str):
loopback0_ipv4 = loopback0_ip_str
break
return loopback0_ipv4
def get_local_interface(self, local_addr):
"""
Get interface according to the local address from the directory
:param: directory: Directory object that stored metadata of interfaces
:param: local_addr: Local address of the interface
:return: Return the metadata of the interface with the local address
If the interface has not been set, return None
"""
local_addresses = self.directory.get_slot("LOCAL", "local_addresses")
# Check if the local address of this bgp session has been set
if local_addr not in local_addresses:
return None
local_address = local_addresses[local_addr]
interfaces = self.directory.get_slot("LOCAL", "interfaces")
# Check if the information for the interface of this local address has been set
if local_address.has_key("interface") and local_address["interface"] in interfaces:
return interfaces[local_address["interface"]]
else:
return None
@staticmethod
def get_vnet(interface):
"""
Get the VNet name of the interface
:param: interface: The metadata of the interface
:return: Return the vnet name of the interface if this interface belongs to a vnet,
Otherwise return None
"""
if interface.has_key("vnet_name") and interface["vnet_name"]:
return interface["vnet_name"]
else:
return None
@staticmethod
def split_key(key):
"""
Split key into ip address and vrf name. If there is no vrf, "default" would be return for vrf
:param key: key to split
:return: vrf name extracted from the key, peer ip address extracted from the key
"""
if '|' not in key:
return 'default', key
else:
return tuple(key.split('|', 1))
@staticmethod
def load_peers():
"""
Load peers from FRR.
:return: set of peers, which are already installed in FRR
"""
command = ["vtysh", "-c", "show bgp vrfs json"]
ret_code, out, err = run_command(command)
if ret_code == 0:
js_vrf = json.loads(out)
vrfs = js_vrf['vrfs'].keys()
else:
log_crit("Can't read bgp vrfs: %s" % err)
raise Exception("Can't read bgp vrfs: %s" % err)
peers = set()
for vrf in vrfs:
command = ["vtysh", "-c", 'show bgp vrf %s neighbors json' % str(vrf)]
ret_code, out, err = run_command(command)
if ret_code == 0:
js_bgp = json.loads(out)
for nbr in js_bgp.keys():
peers.add((vrf, nbr))
else:
log_crit("Can't read vrf '%s' neighbors: %s" % (vrf, str(err)))
raise Exception("Can't read vrf '%s' neighbors: %s" % (vrf, str(err)))
return peers

View File

@ -0,0 +1,28 @@
from .manager import Manager
class BGPDataBaseMgr(Manager):
""" This class updates the Directory object when db table is updated """
def __init__(self, common_objs, db, table):
"""
Initialize the object
:param common_objs: common object dictionary
:param db: name of the db
:param table: name of the table in the db
"""
super(BGPDataBaseMgr, self).__init__(
common_objs,
[],
db,
table,
)
def set_handler(self, key, data):
""" Implementation of 'SET' command for this class """
self.directory.put(self.db_name, self.table_name, key, data)
return True
def del_handler(self, key):
""" Implementation of 'DEL' command for this class """
self.directory.remove(self.db_name, self.table_name, key)

View File

@ -0,0 +1,56 @@
import netaddr
from .log import log_warn
from .manager import Manager
class InterfaceMgr(Manager):
""" This class updates the Directory object when interface-related table is updated """
def __init__(self, common_objs, db, table):
"""
Initialize the object
:param common_objs: common object dictionary
:param db: name of the db
:param table: name of the table in the db
"""
super(InterfaceMgr, self).__init__(
common_objs,
[],
db,
table,
)
def set_handler(self, key, data):
""" Implementation of 'SET' command.
Similar to BGPDataBaseMgr but enriches data object with additional data """
# Interface table can have two keys,
# one with ip prefix and one without ip prefix
if '|' in key:
interface_name, network_str = key.split('|', 1)
try:
network = netaddr.IPNetwork(str(network_str))
except (netaddr.NotRegisteredError, netaddr.AddrFormatError, netaddr.AddrConversionError):
log_warn("Subnet '%s' format is wrong for interface '%s'" % (network_str, data["interface"]))
return True
data["interface"] = interface_name
data["prefixlen"] = str(network.prefixlen)
ip = str(network.ip)
self.directory.put("LOCAL", "local_addresses", ip, data)
self.directory.put(self.db_name, self.table_name, key, data)
self.directory.put("LOCAL", "interfaces", key, data)
return True
def del_handler(self, key):
""" Implementation of 'DEL' command
Also removes data object enrichment """
if '|' in key:
interface, network = key.split('|', 1)
try:
network = netaddr.IPNetwork(str(network))
except (netaddr.NotRegisteredError, netaddr.AddrFormatError, netaddr.AddrConversionError):
log_warn("Subnet '%s' format is wrong for interface '%s'" % (network, interface))
return
ip = str(network.ip)
self.directory.remove("LOCAL", "local_addresses", ip)
self.directory.remove(self.db_name, self.table_name, key)
self.directory.remove("LOCAL", "interfaces", key)

View File

@ -0,0 +1,69 @@
import jinja2
from .log import log_err, log_warn, log_info
from .manager import Manager
from .template import TemplateFabric
class ZebraSetSrc(Manager):
""" This class initialize "set src" settings for zebra """
def __init__(self, common_objs, db, table):
"""
Initialize the object
:param common_objs: common object dictionary
:param db: name of the db
:param table: name of the table in the db
"""
super(ZebraSetSrc, self).__init__(
common_objs,
[],
db,
table,
)
tf = common_objs['tf']
self.zebra_set_src_template = tf.from_file("zebra/zebra.set_src.conf.j2")
self.lo_ipv4 = None
self.lo_ipv6 = None
def set_handler(self, key, data):
""" Implementation of 'SET' command for this class """
self.directory.put(self.db_name, self.table_name, key, data)
#
if key.startswith("Loopback0|") and "state" in data and data["state"] == "ok":
ip_addr_w_mask = key.replace("Loopback0|", "")
slash_pos = ip_addr_w_mask.rfind("/")
if slash_pos == -1:
log_err("Wrong Loopback0 ip prefix: '%s'" % ip_addr_w_mask)
return True
ip_addr = ip_addr_w_mask[:slash_pos]
try:
if TemplateFabric.is_ipv4(ip_addr):
if self.lo_ipv4 is None:
self.lo_ipv4 = ip_addr
txt = self.zebra_set_src_template.render(rm_name="RM_SET_SRC", lo_ip=ip_addr, ip_proto="")
else:
log_warn("Update command is not supported for set src templates. current ip='%s'. new ip='%s'" % (self.lo_ipv4, ip_addr))
return True
elif TemplateFabric.is_ipv6(ip_addr):
if self.lo_ipv6 is None:
self.lo_ipv6 = ip_addr
txt = self.zebra_set_src_template.render(rm_name="RM_SET_SRC6", lo_ip=ip_addr, ip_proto="v6")
else:
log_warn("Update command is not supported for set src templates. current ip='%s'. new ip='%s'" % (self.lo_ipv6, ip_addr))
return True
else:
log_err("Got ambiguous ip address '%s'" % ip_addr)
return True
except jinja2.TemplateError as e:
log_err("Error while rendering 'set src' template: %s" % str(e))
return True
if self.cfg_mgr.push(txt):
log_info("The 'set src' configuration with Loopback0 ip '%s' was pushed" % ip_addr)
else:
log_err("The 'set src' configuration with Loopback0 ip '%s' wasn't pushed" % ip_addr)
return True
def del_handler(self, key):
""" Implementation of 'DEL' command for this class """
self.directory.remove(self.db_name, self.table_name, key)
log_warn("Delete command is not supported for 'zebra set src' templates")

View File

@ -0,0 +1,65 @@
from collections import defaultdict
from swsscommon import swsscommon
from .log import log_debug
g_run = True
def signal_handler(_, __): # signal_handler(signum, frame)
""" signal handler """
global g_run
g_run = False
class Runner(object):
""" Implements main io-loop of the application
It will run event handlers inside of Manager objects
when corresponding db/table is updated
"""
SELECT_TIMEOUT = 1000
def __init__(self):
""" Constructor """
self.db_connectors = {}
self.selector = swsscommon.Select()
self.callbacks = defaultdict(lambda: defaultdict(list)) # db -> table -> handlers[]
self.subscribers = set()
def add_manager(self, manager):
"""
Add a manager to the Runner.
As soon as new events will be receiving by Runner,
handlers of corresponding objects will be executed
:param manager: an object implementing Manager
"""
db_name = manager.get_database()
table_name = manager.get_table_name()
db = swsscommon.SonicDBConfig.getDbId(db_name)
if db not in self.db_connectors:
self.db_connectors[db] = swsscommon.DBConnector(db_name, 0)
if table_name not in self.callbacks[db]:
conn = self.db_connectors[db]
subscriber = swsscommon.SubscriberStateTable(conn, table_name)
self.subscribers.add(subscriber)
self.selector.addSelectable(subscriber)
self.callbacks[db][table_name].append(manager.handler)
def run(self):
""" Main loop """
while g_run:
state, _ = self.selector.select(Runner.SELECT_TIMEOUT)
if state == self.selector.TIMEOUT:
continue
elif state == self.selector.ERROR:
raise Exception("Received error from select")
for subscriber in self.subscribers:
key, op, fvs = subscriber.pop()
if not key:
continue
log_debug("Received message : '%s'" % str((key, op, fvs)))
for callback in self.callbacks[subscriber.getDbConnector().getDbId()][subscriber.getTableName()]:
callback(key, op, dict(fvs))

View File

@ -0,0 +1,55 @@
import datetime
import subprocess
import time
import yaml
from .log import log_debug, log_err, log_info, log_warn, log_crit
def run_command(command, shell=False, hide_errors=False):
"""
Run a linux command. The command is defined as a list. See subprocess.Popen documentation on format
:param command: command to execute. Type: List of strings
:param shell: execute the command through shell when True. Type: Boolean
:param hide_errors: don't report errors to syslog when True. Type: Boolean
:return: Tuple: integer exit code from the command, stdout as a string, stderr as a string
"""
log_debug("execute command '%s'." % str(command))
p = subprocess.Popen(command, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
if not hide_errors:
print_tuple = p.returncode, str(command), stdout, stderr
log_err("command execution returned %d. Command: '%s', stdout: '%s', stderr: '%s'" % print_tuple)
return p.returncode, stdout, stderr
def wait_for_daemons(daemons, seconds):
"""
Wait until FRR daemons are ready for requests
:param daemons: list of FRR daemons to wait
:param seconds: number of seconds to wait, until raise an error
"""
stop_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds)
log_info("Start waiting for FRR daemons: %s" % str(datetime.datetime.now()))
while datetime.datetime.now() < stop_time:
ret_code, out, err = run_command(["vtysh", "-c", "show daemons"], hide_errors=True)
if ret_code == 0 and all(daemon in out for daemon in daemons):
log_info("All required daemons have connected to vtysh: %s" % str(datetime.datetime.now()))
return
else:
log_warn("Can't read daemon status from FRR: %s" % str(err))
time.sleep(0.1) # sleep 100 ms
raise RuntimeError("FRR daemons hasn't been started in %d seconds" % seconds)
def read_constants():
""" Read file with constants values from /etc/sonic/constants.yml """
with open('/etc/sonic/constants.yml') as fp:
content = yaml.load(fp) # FIXME: , Loader=yaml.FullLoader)
if "constants" not in content:
log_crit("/etc/sonic/constants.yml doesn't have 'constants' key")
raise Exception("/etc/sonic/constants.yml doesn't have 'constants' key")
return content["constants"]

View File

@ -1,2 +1,2 @@
[pytest]
addopts = --cov=app --cov-report term
addopts = --cov=bgpcfgd --cov-report term

View File

@ -9,13 +9,13 @@ setuptools.setup(name='sonic-bgpcfgd',
author_email='pavelsh@microsoft.com',
url='https://github.com/Azure/sonic-buildimage',
packages=setuptools.find_packages(),
scripts=['bgpcfgd'],
entry_points={
'console_scripts': [
'bgpcfgd = bgpcfgd.main:main',
'bgpmon = bgpmon.bgpmon:main',
]
},
install_requires=['jinja2>=2.10', 'netaddr', 'pyyaml'],
setup_requires=['pytest-runner'],
test_requires=['pytest', 'pytest-cov'],
tests_require=['pytest', 'pytest-cov'],
)

View File

@ -1,8 +1,9 @@
from app.directory import Directory
from app.template import TemplateFabric
import app
from bgpcfgd.directory import Directory
from bgpcfgd.template import TemplateFabric
import bgpcfgd
from mock import MagicMock, patch
swsscommon_module_mock = MagicMock()
global_constants = {
@ -22,14 +23,14 @@ global_constants = {
@patch.dict("sys.modules", swsscommon=swsscommon_module_mock)
def set_del_test(op, args, currect_config, expected_config):
from app.allow_list import BGPAllowListMgr
from bgpcfgd.managers_allow_list import BGPAllowListMgr
set_del_test.push_list_called = False
def push_list(args):
set_del_test.push_list_called = True
assert args == expected_config
return True
#
app.allow_list.run_command = lambda cmd: (0, "", "")
bgpcfgd.managers_allow_list.run_command = lambda cmd: (0, "", "")
#
cfg_mgr = MagicMock()
cfg_mgr.update.return_value = None
@ -188,7 +189,7 @@ def test_set_handler_with_community_data_is_already_presented():
@patch.dict("sys.modules", swsscommon=swsscommon_module_mock)
def test_set_handler_no_community_data_is_already_presented():
from app.allow_list import BGPAllowListMgr
from bgpcfgd.managers_allow_list import BGPAllowListMgr
cfg_mgr = MagicMock()
cfg_mgr.update.return_value = None
cfg_mgr.get_text.return_value = [
@ -311,7 +312,7 @@ def test_set_handler_no_community_update_prefixes_add():
@patch.dict("sys.modules", swsscommon=swsscommon_module_mock)
def test___set_handler_validate():
from app.allow_list import BGPAllowListMgr
from bgpcfgd.managers_allow_list import BGPAllowListMgr
cfg_mgr = MagicMock()
common_objs = {
'directory': Directory(),
@ -338,7 +339,7 @@ def test___set_handler_validate():
@patch.dict("sys.modules", swsscommon=swsscommon_module_mock)
def test___find_peer_group_by_deployment_id():
from app.allow_list import BGPAllowListMgr
from bgpcfgd.managers_allow_list import BGPAllowListMgr
cfg_mgr = MagicMock()
cfg_mgr.update.return_value = None
cfg_mgr.get_text.return_value = [
@ -432,7 +433,7 @@ def test___find_peer_group_by_deployment_id():
@patch.dict("sys.modules", swsscommon=swsscommon_module_mock)
def test___restart_peers_found_deployment_id():
from app.allow_list import BGPAllowListMgr
from bgpcfgd.managers_allow_list import BGPAllowListMgr
test___restart_peers_found_deployment_id.run_command_counter = 0
def run_command(cmd):
output = [
@ -454,13 +455,13 @@ def test___restart_peers_found_deployment_id():
mocked = MagicMock(name='_BGPAllowListMgr__find_peer_group_by_deployment_id')
mocked.return_value = ["BGP_TEST_PEER_GROUP_1", "BGP_TEST_PEER_GROUP_2"]
mgr._BGPAllowListMgr__find_peer_group_by_deployment_id = mocked
app.allow_list.run_command = run_command
bgpcfgd.managers_allow_list.run_command = run_command
rc = mgr._BGPAllowListMgr__restart_peers(5)
assert rc
@patch.dict("sys.modules", swsscommon=swsscommon_module_mock)
def test___restart_peers_not_found_deployment_id():
from app.allow_list import BGPAllowListMgr
from bgpcfgd.managers_allow_list import BGPAllowListMgr
def run_command(cmd):
assert cmd == ['vtysh', '-c', 'clear bgp * soft in']
return 0, "", ""
@ -475,7 +476,7 @@ def test___restart_peers_not_found_deployment_id():
mocked = MagicMock(name='_BGPAllowListMgr__find_peer_group_by_deployment_id')
mocked.return_value = []
mgr._BGPAllowListMgr__find_peer_group_by_deployment_id = mocked
app.allow_list.run_command = run_command
bgpcfgd.managers_allow_list.run_command = run_command
rc = mgr._BGPAllowListMgr__restart_peers(5)
assert rc

View File

@ -1,7 +1,7 @@
import os
import re
from app.template import TemplateFabric
from bgpcfgd.template import TemplateFabric
from .util import load_constants
TEMPLATE_PATH = os.path.abspath('../../dockers/docker-fpm-frr/frr')

View File

@ -1,4 +1,4 @@
from app.template import TemplateFabric
from bgpcfgd.template import TemplateFabric
from collections import OrderedDict
import pytest

View File

@ -2,7 +2,7 @@ import os
import subprocess
from app.config import ConfigMgr
from bgpcfgd.config import ConfigMgr
from .test_templates import compress_comments, write_result

View File

@ -2,8 +2,8 @@ import os
import json
from app.template import TemplateFabric
from app.config import ConfigMgr
from bgpcfgd.template import TemplateFabric
from bgpcfgd.config import ConfigMgr
from .util import load_constants
TEMPLATE_PATH = os.path.abspath('../../dockers/docker-fpm-frr/frr')