5e17126ffe
Issue was key not correct to look into self.peer. It need to be tuple of (vrf,nbr). Updated for both add/del
1139 lines
43 KiB
Python
Executable File
1139 lines
43 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import sys
|
|
import subprocess
|
|
import datetime
|
|
import time
|
|
import syslog
|
|
import signal
|
|
import traceback
|
|
import os
|
|
import tempfile
|
|
import json
|
|
from collections import defaultdict, OrderedDict
|
|
from pprint import pprint
|
|
from functools import partial
|
|
|
|
import yaml
|
|
import jinja2
|
|
import netaddr
|
|
from swsscommon import swsscommon
|
|
|
|
|
|
g_run = True
|
|
g_debug = False
|
|
|
|
|
|
def log_debug(msg):
|
|
""" Send a message msg to the syslog as DEBUG """
|
|
if g_debug:
|
|
syslog.syslog(syslog.LOG_DEBUG, msg)
|
|
|
|
def log_notice(msg):
|
|
""" Send a message msg to the syslog as NOTICE """
|
|
syslog.syslog(syslog.LOG_NOTICE, msg)
|
|
|
|
def log_info(msg):
|
|
""" Send a message msg to the syslog as INFO """
|
|
syslog.syslog(syslog.LOG_INFO, msg)
|
|
|
|
def log_warn(msg):
|
|
""" Send a message msg to the syslog as WARNING """
|
|
syslog.syslog(syslog.LOG_WARNING, msg)
|
|
|
|
def log_err(msg):
|
|
""" Send a message msg to the syslog as ERR """
|
|
syslog.syslog(syslog.LOG_ERR, msg)
|
|
|
|
def log_crit(msg):
|
|
""" Send a message msg to the syslog as CRIT """
|
|
syslog.syslog(syslog.LOG_CRIT, msg)
|
|
|
|
|
|
def run_command(command, shell=False, hide_errors=False):
|
|
"""
|
|
Run a linux command. The command is defined as a list. See subprocess.Popen documentation on format
|
|
:param command: command to execute. Type: List of strings
|
|
:param shell: execute the command through shell when True. Type: Boolean
|
|
:param hide_errors: don't report errors to syslog when True. Type: Boolean
|
|
:return: Tuple: integer exit code from the command, stdout as a string, stderr as a string
|
|
"""
|
|
log_debug("execute command '%s'." % str(command))
|
|
p = subprocess.Popen(command, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
stdout, stderr = p.communicate()
|
|
if p.returncode != 0:
|
|
if not hide_errors:
|
|
print_tuple = p.returncode, str(command), stdout, stderr
|
|
log_err("command execution returned %d. Command: '%s', stdout: '%s', stderr: '%s'" % print_tuple)
|
|
|
|
return p.returncode, stdout, stderr
|
|
|
|
|
|
class ConfigMgr(object):
|
|
""" The class represents frr configuration """
|
|
def __init__(self):
|
|
self.current_config = None
|
|
|
|
def reset(self):
|
|
""" Reset stored config """
|
|
self.current_config = None
|
|
|
|
def update(self):
|
|
""" Read current config from FRR """
|
|
self.current_config = None
|
|
ret_code, out, err = run_command(["vtysh", "-c", "show running-config"])
|
|
if ret_code != 0:
|
|
log_crit("can't update running config: rc=%d out='%s' err='%s'" % (ret_code, out, err))
|
|
return
|
|
self.current_config = self.to_canonical(out)
|
|
|
|
def push(self, cmd):
|
|
"""
|
|
Push new changes to FRR
|
|
:param cmd: configuration change for FRR. Type: String
|
|
:return: True if change was applied successfully, False otherwise
|
|
"""
|
|
return self.write(cmd)
|
|
|
|
def write(self, cmd):
|
|
"""
|
|
Write configuration change to FRR.
|
|
:param cmd: new configuration to write into FRR. Type: String
|
|
:return: True if change was applied successfully, False otherwise
|
|
"""
|
|
fd, tmp_filename = tempfile.mkstemp(dir='/tmp')
|
|
os.close(fd)
|
|
with open(tmp_filename, 'w') as fp:
|
|
fp.write("%s\n" % cmd)
|
|
command = ["vtysh", "-f", tmp_filename]
|
|
ret_code, out, err = run_command(command)
|
|
if not g_debug:
|
|
os.remove(tmp_filename)
|
|
if ret_code != 0:
|
|
err_tuple = str(cmd), ret_code, out, err
|
|
log_err("ConfigMgr::push(): can't push configuration '%s', rc='%d', stdout='%s', stderr='%s'" % err_tuple)
|
|
if ret_code == 0:
|
|
self.current_config = None # invalidate config
|
|
return ret_code == 0
|
|
|
|
@staticmethod
|
|
def to_canonical(raw_config):
|
|
"""
|
|
Convert FRR config into canonical format
|
|
:param raw_config: config in frr format
|
|
:return: frr config in canonical format
|
|
"""
|
|
parsed_config = []
|
|
cur_offset = 0
|
|
lines = raw_config.split("\n")
|
|
cur_path = [lines[0]]
|
|
for line in lines:
|
|
if line.strip().startswith('!') or line.strip() == '':
|
|
continue
|
|
n_spaces = ConfigMgr.count_spaces(line)
|
|
s_line = line.strip()
|
|
assert(n_spaces == cur_offset or (n_spaces + 1) == cur_offset or (n_spaces - 1) == cur_offset)
|
|
if n_spaces == cur_offset:
|
|
cur_path[-1] = s_line
|
|
elif n_spaces > cur_offset:
|
|
cur_path.append(s_line)
|
|
elif n_spaces < cur_offset:
|
|
cur_path = cur_path[:-2]
|
|
cur_path.append(s_line)
|
|
parsed_config.append(cur_path[:])
|
|
cur_offset = n_spaces
|
|
return parsed_config
|
|
|
|
@staticmethod
|
|
def count_spaces(line):
|
|
""" Count leading spaces in the line """
|
|
return len(line) - len(line.lstrip())
|
|
|
|
@staticmethod
|
|
def from_canonical(canonical_config):
|
|
"""
|
|
Convert config from canonical format into FRR raw format
|
|
:param canonical_config: config in a canonical format
|
|
:return: config in the FRR raw format
|
|
"""
|
|
out = ""
|
|
for lines in canonical_config:
|
|
spaces = len(lines) - 1
|
|
out += " " * spaces + lines[-1] + "\n"
|
|
|
|
return out
|
|
|
|
|
|
class TemplateFabric(object):
|
|
""" Fabric for rendering jinja2 templates """
|
|
def __init__(self):
|
|
j2_template_paths = ['/usr/share/sonic/templates']
|
|
j2_loader = jinja2.FileSystemLoader(j2_template_paths)
|
|
j2_env = jinja2.Environment(loader=j2_loader, trim_blocks=False)
|
|
j2_env.filters['ipv4'] = self.is_ipv4
|
|
j2_env.filters['ipv6'] = self.is_ipv6
|
|
j2_env.filters['pfx_filter'] = self.pfx_filter
|
|
for attr in ['ip', 'network', 'prefixlen', 'netmask']:
|
|
j2_env.filters[attr] = partial(self.prefix_attr, attr)
|
|
self.env = j2_env
|
|
|
|
def from_file(self, filename):
|
|
"""
|
|
Read a template from a file
|
|
:param filename: filename of the file. Type String
|
|
:return: Jinja2 template object
|
|
"""
|
|
return self.env.get_template(filename)
|
|
|
|
def from_string(self, tmpl):
|
|
"""
|
|
Read a template from a string
|
|
:param tmpl: Text representation of Jinja2 template
|
|
:return: Jinja2 template object
|
|
"""
|
|
return self.env.from_string(tmpl)
|
|
|
|
@staticmethod
|
|
def is_ipv4(value):
|
|
""" Return True if the value is an ipv4 address """
|
|
if not value:
|
|
return False
|
|
if isinstance(value, netaddr.IPNetwork):
|
|
addr = value
|
|
else:
|
|
try:
|
|
addr = netaddr.IPNetwork(str(value))
|
|
except (netaddr.NotRegisteredError, netaddr.AddrFormatError, netaddr.AddrConversionError):
|
|
return False
|
|
return addr.version == 4
|
|
|
|
@staticmethod
|
|
def is_ipv6(value):
|
|
""" Return True if the value is an ipv6 address """
|
|
if not value:
|
|
return False
|
|
if isinstance(value, netaddr.IPNetwork):
|
|
addr = value
|
|
else:
|
|
try:
|
|
addr = netaddr.IPNetwork(str(value))
|
|
except (netaddr.NotRegisteredError, netaddr.AddrFormatError, netaddr.AddrConversionError):
|
|
return False
|
|
return addr.version == 6
|
|
|
|
@staticmethod
|
|
def prefix_attr(attr, value):
|
|
"""
|
|
Extract attribute from IPNetwork object
|
|
:param attr: attribute to extract
|
|
:param value: the string representation of ip prefix which will be converted to IPNetwork.
|
|
:return: the value of the extracted attribute
|
|
"""
|
|
if not value:
|
|
return None
|
|
else:
|
|
try:
|
|
prefix = netaddr.IPNetwork(str(value))
|
|
except (netaddr.NotRegisteredError, netaddr.AddrFormatError, netaddr.AddrConversionError):
|
|
return None
|
|
return str(getattr(prefix, attr))
|
|
|
|
@staticmethod
|
|
def pfx_filter(value):
|
|
"""INTERFACE Table can have keys in one of the two formats:
|
|
string or tuple - This filter skips the string keys and only
|
|
take into account the tuple.
|
|
For eg - VLAN_INTERFACE|Vlan1000 vs VLAN_INTERFACE|Vlan1000|192.168.0.1/21
|
|
"""
|
|
table = OrderedDict()
|
|
|
|
if not value:
|
|
return table
|
|
|
|
for key, val in value.items():
|
|
if not isinstance(key, tuple):
|
|
continue
|
|
table[key] = val
|
|
return table
|
|
|
|
|
|
class Directory(object):
|
|
""" This class stores values and notifies callbacks which were registered to be executed as soon
|
|
as some value is changed. This class works as DB cache mostly """
|
|
def __init__(self):
|
|
self.data = defaultdict(dict) # storage. A key is a slot name, a value is a dictionary with data
|
|
self.notify = defaultdict(lambda: defaultdict(list)) # registered callbacks: slot -> path -> handlers[]
|
|
|
|
@staticmethod
|
|
def get_slot_name(db, table):
|
|
""" Convert db, table pair into a slot name """
|
|
return db + "__" + table
|
|
|
|
def path_traverse(self, slot, path):
|
|
"""
|
|
Traverse a path in the storage.
|
|
If the path is an empty string, it returns a value as it is.
|
|
If the path is not an empty string, the method will traverse through the dictionary value.
|
|
Example:
|
|
self.data["key_1"] = { "abc": { "cde": { "fgh": "val_1", "ijk": "val_2" } } }
|
|
self.path_traverse("key_1", "abc/cde") will return True, { "fgh": "val_1", "ijk": "val_2" }
|
|
:param slot: storage key
|
|
:param path: storage path as a string where each internal key is separated by '/'
|
|
:return: a pair: True if the path was found, object if it was found
|
|
"""
|
|
if slot not in self.data:
|
|
return False, None
|
|
elif path == '':
|
|
return True, self.data[slot]
|
|
d = self.data[slot]
|
|
for p in path.split("/"):
|
|
if p not in d:
|
|
return False, None
|
|
d = d[p]
|
|
return True, d
|
|
|
|
def path_exist(self, db, table, path):
|
|
"""
|
|
Check if the path exists in the storage
|
|
:param db: db name
|
|
:param table: table name
|
|
:param path: requested path
|
|
:return: True if the path is available, False otherwise
|
|
"""
|
|
slot = self.get_slot_name(db, table)
|
|
return self.path_traverse(slot, path)[0]
|
|
|
|
def get_path(self, db, table, path):
|
|
"""
|
|
Return the requested path from the storage
|
|
:param db: db name
|
|
:param table: table name
|
|
:param path: requested path
|
|
:return: object if the path was found, None otherwise
|
|
"""
|
|
slot = self.get_slot_name(db, table)
|
|
return self.path_traverse(slot, path)[1]
|
|
|
|
def put(self, db, table, key, value):
|
|
"""
|
|
Put information into the storage. Notify handlers which are dependant to the information
|
|
:param db: db name
|
|
:param table: table name
|
|
:param key: key to change
|
|
:param value: value to put
|
|
:return:
|
|
"""
|
|
slot = self.get_slot_name(db, table)
|
|
self.data[slot][key] = value
|
|
if slot in self.notify:
|
|
for path in self.notify[slot].keys():
|
|
if self.path_exist(db, table, path):
|
|
for handler in self.notify[slot][path]:
|
|
handler()
|
|
|
|
def get(self, db, table, key):
|
|
"""
|
|
Get a value from the storage
|
|
:param db: db name
|
|
:param table: table name
|
|
:param key: ket to get
|
|
:return: value for the key
|
|
"""
|
|
slot = self.get_slot_name(db, table)
|
|
return self.data[slot][key]
|
|
|
|
def get_slot(self, db, table):
|
|
"""
|
|
Get an object from the storage
|
|
:param db: db name
|
|
:param table: table name
|
|
:return: object for the slot
|
|
"""
|
|
slot = self.get_slot_name(db, table)
|
|
return self.data[slot]
|
|
|
|
def remove(self, db, table, key):
|
|
"""
|
|
Remove a value from the storage
|
|
:param db: db name
|
|
:param table: table name
|
|
:param key: key to remove
|
|
"""
|
|
slot = self.get_slot_name(db, table)
|
|
if slot in self.data:
|
|
if key in self.data[slot]:
|
|
del self.data[slot][key]
|
|
else:
|
|
log_err("Directory: Can't remove key '%s' from slot '%s'. The key doesn't exist" % (key, slot))
|
|
else:
|
|
log_err("Directory: Can't remove key '%s' from slot '%s'. The slot doesn't exist" % (key, slot))
|
|
|
|
def remove_slot(self, db, table):
|
|
"""
|
|
Remove an object from the storage
|
|
:param db: db name
|
|
:param table: table name
|
|
"""
|
|
slot = self.get_slot_name(db, table)
|
|
if slot in self.data:
|
|
del self.data[slot]
|
|
else:
|
|
log_err("Directory: Can't remove slot '%s'. The slot doesn't exist" % slot)
|
|
|
|
def available(self, db, table):
|
|
"""
|
|
Check if the table is available
|
|
:param db: db name
|
|
:param table: table name
|
|
:return: True if the slot is available, False if not
|
|
"""
|
|
slot = self.get_slot_name(db, table)
|
|
return slot in self.data
|
|
|
|
def available_deps(self, deps):
|
|
"""
|
|
Check if all items from the deps list is available in the storage
|
|
:param deps: list of dependencies
|
|
:return: True if all dependencies are presented, False otherwise
|
|
"""
|
|
res = True
|
|
for db, table, path in deps:
|
|
res = res and self.path_exist(db, table, path)
|
|
return res
|
|
|
|
def subscribe(self, deps, handler):
|
|
"""
|
|
Subscribe the handler to be run as soon as all dependencies are presented
|
|
:param deps:
|
|
:param handler:
|
|
:return:
|
|
"""
|
|
for db, table, path in deps:
|
|
slot = self.get_slot_name(db, table)
|
|
self.notify[slot][path].append(handler)
|
|
|
|
|
|
class Runner(object):
|
|
""" Implements main io-loop of the application
|
|
It will run event handlers inside of Manager objects
|
|
when corresponding db/table is updated
|
|
"""
|
|
SELECT_TIMEOUT = 1000
|
|
|
|
def __init__(self):
|
|
""" Constructor """
|
|
self.db_connectors = {}
|
|
self.selector = swsscommon.Select()
|
|
self.callbacks = defaultdict(lambda: defaultdict(list)) # db -> table -> handlers[]
|
|
self.subscribers = set()
|
|
|
|
def add_manager(self, manager):
|
|
"""
|
|
Add a manager to the Runner.
|
|
As soon as new events will be receiving by Runner,
|
|
handlers of corresponding objects will be executed
|
|
:param manager: an object implementing Manager
|
|
"""
|
|
db_name = manager.get_database()
|
|
table_name = manager.get_table_name()
|
|
db = swsscommon.SonicDBConfig.getDbId(db_name)
|
|
if db not in self.db_connectors:
|
|
self.db_connectors[db] = swsscommon.DBConnector(db_name, 0)
|
|
|
|
if table_name not in self.callbacks[db]:
|
|
conn = self.db_connectors[db]
|
|
subscriber = swsscommon.SubscriberStateTable(conn, table_name)
|
|
self.subscribers.add(subscriber)
|
|
self.selector.addSelectable(subscriber)
|
|
self.callbacks[db][table_name].append(manager.handler)
|
|
|
|
def run(self):
|
|
""" Main loop """
|
|
while g_run:
|
|
state, _ = self.selector.select(Runner.SELECT_TIMEOUT)
|
|
if state == self.selector.TIMEOUT:
|
|
continue
|
|
elif state == self.selector.ERROR:
|
|
raise Exception("Received error from select")
|
|
|
|
for subscriber in self.subscribers:
|
|
key, op, fvs = subscriber.pop()
|
|
if not key:
|
|
continue
|
|
log_debug("Received message : '%s'" % str((key, op, fvs)))
|
|
for callback in self.callbacks[subscriber.getDbConnector().getDbId()][subscriber.getTableName()]:
|
|
callback(key, op, dict(fvs))
|
|
|
|
|
|
class Manager(object):
|
|
""" This class represents a SONiC DB table """
|
|
def __init__(self, common_objs, deps, database, table_name):
|
|
"""
|
|
Initialize class
|
|
:param common_objs: common object dictionary
|
|
:param deps: dependencies list
|
|
:param database: database name
|
|
:param table_name: table name
|
|
"""
|
|
self.directory = common_objs['directory']
|
|
self.cfg_mgr = common_objs['cfg_mgr']
|
|
self.constants = common_objs['constants']
|
|
self.deps = deps
|
|
self.db_name = database
|
|
self.table_name = table_name
|
|
self.set_queue = []
|
|
self.directory.subscribe(deps, self.on_deps_change) # subscribe this class method on directory changes
|
|
|
|
def get_database(self):
|
|
""" Return associated database """
|
|
return self.db_name
|
|
|
|
def get_table_name(self):
|
|
""" Return associated table name"""
|
|
return self.table_name
|
|
|
|
def handler(self, key, op, data):
|
|
"""
|
|
This method is executed on each add/remove event on the table.
|
|
:param key: key of the table entry
|
|
:param op: operation on the table entry. Could be either 'SET' or 'DEL'
|
|
:param data: associated data of the event. Empty for 'DEL' operation.
|
|
"""
|
|
if op == swsscommon.SET_COMMAND:
|
|
if self.directory.available_deps(self.deps): # all required dependencies are set in the Directory?
|
|
res = self.set_handler(key, data)
|
|
if not res: # set handler returned False, which means it is not ready to process is. Save it for later.
|
|
log_debug("'SET' handler returned NOT_READY for the Manager: %s" % self.__class__)
|
|
self.set_queue.append((key, data))
|
|
else:
|
|
log_debug("Not all dependencies are met for the Manager: %s" % self.__class__)
|
|
self.set_queue.append((key, data))
|
|
elif op == swsscommon.DEL_COMMAND:
|
|
self.del_handler(key)
|
|
else:
|
|
log_err("Invalid operation '%s' for key '%s'" % (op, key))
|
|
|
|
def on_deps_change(self):
|
|
""" This method is being executed on every dependency change """
|
|
if not self.directory.available_deps(self.deps):
|
|
return
|
|
new_queue = []
|
|
for key, data in self.set_queue:
|
|
res = self.set_handler(key, data)
|
|
if not res:
|
|
new_queue.append((key, data))
|
|
self.set_queue = new_queue
|
|
|
|
def set_handler(self, key, data):
|
|
""" Placeholder for 'SET' command """
|
|
log_err("set_handler() wasn't implemented for %s" % self.__class__.__name__)
|
|
|
|
def del_handler(self, key):
|
|
""" Placeholder for 'DEL' command """
|
|
log_err("del_handler wasn't implemented for %s" % self.__class__.__name__)
|
|
|
|
|
|
class BGPDataBaseMgr(Manager):
|
|
""" This class updates the Directory object when db table is updated """
|
|
def __init__(self, common_objs, db, table):
|
|
"""
|
|
Initialize the object
|
|
:param common_objs: common object dictionary
|
|
:param db: name of the db
|
|
:param table: name of the table in the db
|
|
"""
|
|
super(BGPDataBaseMgr, self).__init__(
|
|
common_objs,
|
|
[],
|
|
db,
|
|
table,
|
|
)
|
|
|
|
def set_handler(self, key, data):
|
|
""" Implementation of 'SET' command for this class """
|
|
self.directory.put(self.db_name, self.table_name, key, data)
|
|
|
|
return True
|
|
|
|
def del_handler(self, key):
|
|
""" Implementation of 'DEL' command for this class """
|
|
self.directory.remove(self.db_name, self.table_name, key)
|
|
|
|
|
|
class InterfaceMgr(Manager):
|
|
""" This class updates the Directory object when interface-related table is updated """
|
|
def __init__(self, common_objs, db, table):
|
|
"""
|
|
Initialize the object
|
|
:param common_objs: common object dictionary
|
|
:param db: name of the db
|
|
:param table: name of the table in the db
|
|
"""
|
|
super(InterfaceMgr, self).__init__(
|
|
common_objs,
|
|
[],
|
|
db,
|
|
table,
|
|
)
|
|
|
|
def set_handler(self, key, data):
|
|
""" Implementation of 'SET' command.
|
|
Similar to BGPDataBaseMgr but enriches data object with additional data """
|
|
# Interface table can have two keys,
|
|
# one with ip prefix and one without ip prefix
|
|
if '|' in key:
|
|
interface_name, network_str = key.split('|', 1)
|
|
try:
|
|
network = netaddr.IPNetwork(str(network_str))
|
|
except (netaddr.NotRegisteredError, netaddr.AddrFormatError, netaddr.AddrConversionError):
|
|
log_warn("Subnet '%s' format is wrong for interface '%s'" % (network_str, data["interface"]))
|
|
return True
|
|
data["interface"] = interface_name
|
|
data["prefixlen"] = str(network.prefixlen)
|
|
ip = str(network.ip)
|
|
self.directory.put("LOCAL", "local_addresses", ip, data)
|
|
self.directory.put(self.db_name, self.table_name, key, data)
|
|
self.directory.put("LOCAL", "interfaces", key, data)
|
|
return True
|
|
|
|
def del_handler(self, key):
|
|
""" Implementation of 'DEL' command
|
|
Also removes data object enrichment """
|
|
if '|' in key:
|
|
interface, network = key.split('|', 1)
|
|
try:
|
|
network = netaddr.IPNetwork(str(network))
|
|
except (netaddr.NotRegisteredError, netaddr.AddrFormatError, netaddr.AddrConversionError):
|
|
log_warn("Subnet '%s' format is wrong for interface '%s'" % (network, interface))
|
|
return
|
|
ip = str(network.ip)
|
|
self.directory.remove("LOCAL", "local_addresses", ip)
|
|
self.directory.remove(self.db_name, self.table_name, key)
|
|
self.directory.remove("LOCAL", "interfaces", key)
|
|
|
|
|
|
class BGPPeerGroupMgr(object):
|
|
""" This class represents peer-group and routing policy for the peer_type """
|
|
def __init__(self, common_objs, base_template):
|
|
"""
|
|
Construct the object
|
|
:param common_objs: common objects
|
|
:param base_template: path to the directory with Jinja2 templates
|
|
"""
|
|
self.cfg_mgr = common_objs['cfg_mgr']
|
|
self.constants = common_objs['constants']
|
|
tf = common_objs['tf']
|
|
self.policy_template = tf.from_file(base_template + "policies.conf.j2")
|
|
self.peergroup_template = tf.from_file(base_template + "peer-group.conf.j2")
|
|
|
|
def update(self, name, **kwargs):
|
|
"""
|
|
Update peer-group and routing policy for the peer with the name
|
|
:param name: name of the peer. Used for logging only
|
|
:param kwargs: dictionary with parameters for rendering
|
|
"""
|
|
rc_policy = self.update_policy(name, **kwargs)
|
|
rc_pg = self.update_pg(name, **kwargs)
|
|
return rc_policy and rc_pg
|
|
|
|
def update_policy(self, name, **kwargs):
|
|
"""
|
|
Update routing policy for the peer
|
|
:param name: name of the peer. Used for logging only
|
|
:param kwargs: dictionary with parameters for rendering
|
|
"""
|
|
try:
|
|
policy = self.policy_template.render(**kwargs)
|
|
except jinja2.TemplateError as e:
|
|
log_err("Can't render policy template name: '%s': %s" % (name, str(e)))
|
|
return False
|
|
|
|
return self.update_entity(policy, "Routing policy for peer '%s'" % name)
|
|
|
|
def update_pg(self, name, **kwargs):
|
|
"""
|
|
Update peer-group for the peer
|
|
:param name: name of the peer. Used for logging only
|
|
:param kwargs: dictionary with parameters for rendering
|
|
"""
|
|
try:
|
|
pg = self.peergroup_template.render(**kwargs)
|
|
except jinja2.TemplateError as e:
|
|
log_err("Can't render peer-group template: '%s': %s" % (name, str(e)))
|
|
return False
|
|
|
|
if kwargs['vrf'] == 'default':
|
|
cmd = ('router bgp %s\n' % kwargs['bgp_asn']) + pg
|
|
else:
|
|
cmd = ('router bgp %s vrf %s\n' % (kwargs['bgp_asn'], kwargs['vrf'])) + pg
|
|
|
|
return self.update_entity(cmd, "Peer-group for peer '%s'" % name)
|
|
|
|
def update_entity(self, cmd, txt):
|
|
"""
|
|
Send commands to FRR
|
|
:param cmd: commands to send in a raw form
|
|
:param txt: text for the syslog output
|
|
:return:
|
|
"""
|
|
ret_code = self.cfg_mgr.push(cmd)
|
|
if ret_code:
|
|
log_info("%s was updated" % txt)
|
|
else:
|
|
log_err("Can't update %s" % txt)
|
|
return ret_code
|
|
|
|
|
|
class BGPPeerMgrBase(Manager):
|
|
""" Manager of BGP peers """
|
|
def __init__(self, common_objs, db_name, table_name, peer_type):
|
|
"""
|
|
Initialize the object
|
|
:param common_objs: common objects
|
|
:param table_name: name of the table with peers
|
|
:param peer_type: type of the peers. It is used to find right templates
|
|
"""
|
|
self.common_objs = common_objs
|
|
self.constants = self.common_objs["constants"]
|
|
self.fabric = common_objs['tf']
|
|
self.peer_type = peer_type
|
|
|
|
base_template = "bgpd/templates/" + self.constants["bgp"]["peers"][peer_type]["template_dir"] + "/"
|
|
self.templates = {
|
|
"add": self.fabric.from_file(base_template + "instance.conf.j2"),
|
|
"delete": self.fabric.from_string('no neighbor {{ neighbor_addr }}'),
|
|
"shutdown": self.fabric.from_string('neighbor {{ neighbor_addr }} shutdown'),
|
|
"no shutdown": self.fabric.from_string('no neighbor {{ neighbor_addr }} shutdown'),
|
|
}
|
|
|
|
deps = [
|
|
("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME, "localhost/bgp_asn"),
|
|
("CONFIG_DB", swsscommon.CFG_LOOPBACK_INTERFACE_TABLE_NAME, "Loopback0"),
|
|
("LOCAL", "local_addresses", ""),
|
|
("LOCAL", "interfaces", ""),
|
|
]
|
|
|
|
self.check_neig_meta = 'bgp' in self.constants \
|
|
and 'use_neighbors_meta' in self.constants['bgp'] \
|
|
and self.constants['bgp']['use_neighbors_meta']
|
|
self.check_deployment_id = 'bgp' in self.constants \
|
|
and 'use_deployment_id' in self.constants['bgp'] \
|
|
and self.constants['bgp']['use_deployment_id']
|
|
|
|
if self.check_neig_meta:
|
|
deps.append(("CONFIG_DB", swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME, ""))
|
|
|
|
if self.check_deployment_id:
|
|
deps.append(("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME, "localhost/deployment_id"))
|
|
|
|
super(BGPPeerMgrBase, self).__init__(
|
|
common_objs,
|
|
deps,
|
|
db_name,
|
|
table_name,
|
|
)
|
|
|
|
self.peers = self.load_peers()
|
|
self.peer_group_mgr = BGPPeerGroupMgr(self.common_objs, base_template)
|
|
return
|
|
|
|
def set_handler(self, key, data):
|
|
"""
|
|
It runs on 'SET' command
|
|
:param key: key of the changed table
|
|
:param data: the data associated with the change
|
|
"""
|
|
vrf, nbr = self.split_key(key)
|
|
peer_key = (vrf, nbr)
|
|
if peer_key not in self.peers:
|
|
return self.add_peer(vrf, nbr, data)
|
|
else:
|
|
return self.update_peer(vrf, nbr, data)
|
|
|
|
def add_peer(self, vrf, nbr, data):
|
|
"""
|
|
Add a peer into FRR. This is used if the peer is not existed in FRR yet
|
|
:param vrf: vrf name. Name is equal "default" for the global vrf
|
|
:param nbr: neighbor ip address (name for dynamic peer type)
|
|
:param data: associated data
|
|
:return: True if this adding was successful, False otherwise
|
|
"""
|
|
print_data = vrf, nbr, data
|
|
bgp_asn = self.directory.get_slot("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME)["localhost"]["bgp_asn"]
|
|
#
|
|
lo0_ipv4 = self.get_lo0_ipv4()
|
|
if lo0_ipv4 is None:
|
|
log_warn("Loopback0 ipv4 address is not presented yet")
|
|
return False
|
|
#
|
|
if "local_addr" not in data:
|
|
log_warn("Peer %s. Missing attribute 'local_addr'" % nbr)
|
|
else:
|
|
# The bgp session that belongs to a vnet cannot be advertised as the default BGP session.
|
|
# So we need to check whether this bgp session belongs to a vnet.
|
|
interface = self.get_local_interface(data["local_addr"])
|
|
if not interface:
|
|
print_data = nbr, data["local_addr"]
|
|
log_debug("Peer '%s' with local address '%s' wait for the corresponding interface to be set" % print_data)
|
|
return False
|
|
vnet = self.get_vnet(interface)
|
|
if vnet:
|
|
# Ignore the bgp session that is in a vnet
|
|
log_info("Ignore the BGP peer '%s' as the interface '%s' is in vnet '%s'" % (nbr, interface, vnet))
|
|
return True
|
|
|
|
kwargs = {
|
|
'CONFIG_DB__DEVICE_METADATA': self.directory.get_slot("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME),
|
|
'constants': self.constants,
|
|
'bgp_asn': bgp_asn,
|
|
'vrf': vrf,
|
|
'neighbor_addr': nbr,
|
|
'bgp_session': data,
|
|
'loopback0_ipv4': lo0_ipv4,
|
|
}
|
|
if self.check_neig_meta:
|
|
neigmeta = self.directory.get_slot("CONFIG_DB", swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME)
|
|
if 'name' in data and data["name"] not in neigmeta:
|
|
log_info("DEVICE_NEIGHBOR_METADATA is not ready for neighbor '%s' - '%s'" % (nbr, data['name']))
|
|
return False
|
|
kwargs['CONFIG_DB__DEVICE_NEIGHBOR_METADATA'] = neigmeta
|
|
|
|
tag = data['name'] if 'name' in data else nbr
|
|
self.peer_group_mgr.update(tag, **kwargs)
|
|
|
|
try:
|
|
cmd = self.templates["add"].render(**kwargs)
|
|
except jinja2.TemplateError as e:
|
|
msg = "Peer '(%s|%s)'. Error in rendering the template for 'SET' command '%s'" % print_data
|
|
log_err("%s: %s" % (msg, str(e)))
|
|
return True
|
|
if cmd is not None:
|
|
ret_code = self.apply_op(cmd, vrf)
|
|
key = (vrf, nbr)
|
|
if ret_code:
|
|
self.peers.add(key)
|
|
log_info("Peer '(%s|%s)' added with attributes '%s'" % print_data)
|
|
else:
|
|
log_err("Peer '(%s|%s)' wasn't added." % (vrf, nbr))
|
|
|
|
return True
|
|
|
|
def update_peer(self, vrf, nbr, data):
|
|
"""
|
|
Update a peer. This is used when the peer is already in the FRR
|
|
Update support only "admin_status" for now
|
|
:param vrf: vrf name. Name is equal "default" for the global vrf
|
|
:param nbr: neighbor ip address (name for dynamic peer type)
|
|
:param data: associated data
|
|
:return: True if this adding was successful, False otherwise
|
|
"""
|
|
if "admin_status" in data:
|
|
self.change_admin_status(vrf, nbr, data)
|
|
else:
|
|
log_err("Peer '(%s|%s)': Can't update the peer. Only 'admin_status' attribute is supported" % (vrf, nbr))
|
|
|
|
return True
|
|
|
|
def change_admin_status(self, vrf, nbr, data):
|
|
"""
|
|
Change admin status of a peer
|
|
:param vrf: vrf name. Name is equal "default" for the global vrf
|
|
:param nbr: neighbor ip address (name for dynamic peer type)
|
|
:param data: associated data
|
|
:return: True if this adding was successful, False otherwise
|
|
"""
|
|
if data['admin_status'] == 'up':
|
|
self.apply_admin_status(vrf, nbr, "no shutdown", "up")
|
|
elif data['admin_status'] == 'down':
|
|
self.apply_admin_status(vrf, nbr, "shutdown", "down")
|
|
else:
|
|
print_data = vrf, nbr, data['admin_status']
|
|
log_err("Peer '%s|%s': Can't update the peer. It has wrong attribute value attr['admin_status'] = '%s'" % print_data)
|
|
|
|
def apply_admin_status(self, vrf, nbr, template_name, admin_state):
|
|
"""
|
|
Render admin state template and apply the command to the FRR
|
|
:param vrf: vrf name. Name is equal "default" for the global vrf
|
|
:param nbr: neighbor ip address (name for dynamic peer type)
|
|
:param template_name: name of the template to render
|
|
:param admin_state: desired admin state
|
|
:return: True if this adding was successful, False otherwise
|
|
"""
|
|
print_data = vrf, nbr, admin_state
|
|
ret_code = self.apply_op(self.templates[template_name].render(neighbor_addr=nbr), vrf)
|
|
if ret_code:
|
|
log_info("Peer '%s|%s' admin state is set to '%s'" % print_data)
|
|
else:
|
|
log_err("Can't set peer '%s|%s' admin state to '%s'." % print_data)
|
|
|
|
def del_handler(self, key):
|
|
"""
|
|
'DEL' handler for the BGP PEER tables
|
|
:param key: key of the neighbor
|
|
"""
|
|
vrf, nbr = self.split_key(key)
|
|
peer_key = (vrf, nbr)
|
|
if peer_key not in self.peers:
|
|
log_warn("Peer '(%s|%s)' has not been found" % (vrf, nbr))
|
|
return
|
|
cmd = self.templates["delete"].render(neighbor_addr=nbr)
|
|
ret_code = self.apply_op(cmd, vrf)
|
|
if ret_code:
|
|
log_info("Peer '(%s|%s)' has been removed" % (vrf, nbr))
|
|
self.peers.remove(key)
|
|
else:
|
|
log_err("Peer '(%s|%s)' hasn't been removed" % (vrf, nbr))
|
|
|
|
def apply_op(self, cmd, vrf):
|
|
"""
|
|
Push commands cmd into FRR
|
|
:param cmd: commands in raw format
|
|
:param vrf: vrf where the commands should be applied
|
|
:return: True if no errors, False if there are errors
|
|
"""
|
|
bgp_asn = self.directory.get_slot("CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME)["localhost"]["bgp_asn"]
|
|
if vrf == 'default':
|
|
cmd = ('router bgp %s\n' % bgp_asn) + cmd
|
|
else:
|
|
cmd = ('router bgp %s vrf %s\n' % (bgp_asn, vrf)) + cmd
|
|
return self.cfg_mgr.push(cmd)
|
|
|
|
def get_lo0_ipv4(self):
|
|
"""
|
|
Extract Loopback0 ipv4 address from the Directory
|
|
:return: ipv4 address for Loopback0, None if nothing found
|
|
"""
|
|
loopback0_ipv4 = None
|
|
for loopback in self.directory.get_slot("CONFIG_DB", swsscommon.CFG_LOOPBACK_INTERFACE_TABLE_NAME).iterkeys():
|
|
if loopback.startswith("Loopback0|"):
|
|
loopback0_prefix_str = loopback.replace("Loopback0|", "")
|
|
loopback0_ip_str = loopback0_prefix_str[:loopback0_prefix_str.find('/')]
|
|
if TemplateFabric.is_ipv4(loopback0_ip_str):
|
|
loopback0_ipv4 = loopback0_ip_str
|
|
break
|
|
|
|
return loopback0_ipv4
|
|
|
|
def get_local_interface(self, local_addr):
|
|
"""
|
|
Get interface according to the local address from the directory
|
|
:param: directory: Directory object that stored metadata of interfaces
|
|
:param: local_addr: Local address of the interface
|
|
:return: Return the metadata of the interface with the local address
|
|
If the interface has not been set, return None
|
|
"""
|
|
local_addresses = self.directory.get_slot("LOCAL", "local_addresses")
|
|
# Check if the local address of this bgp session has been set
|
|
if local_addr not in local_addresses:
|
|
return None
|
|
local_address = local_addresses[local_addr]
|
|
interfaces = self.directory.get_slot("LOCAL", "interfaces")
|
|
# Check if the information for the interface of this local address has been set
|
|
if local_address.has_key("interface") and local_address["interface"] in interfaces:
|
|
return interfaces[local_address["interface"]]
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_vnet(interface):
|
|
"""
|
|
Get the VNet name of the interface
|
|
:param: interface: The metadata of the interface
|
|
:return: Return the vnet name of the interface if this interface belongs to a vnet,
|
|
Otherwise return None
|
|
"""
|
|
if interface.has_key("vnet_name") and interface["vnet_name"]:
|
|
return interface["vnet_name"]
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def split_key(key):
|
|
"""
|
|
Split key into ip address and vrf name. If there is no vrf, "default" would be return for vrf
|
|
:param key: key to split
|
|
:return: vrf name extracted from the key, peer ip address extracted from the key
|
|
"""
|
|
if '|' not in key:
|
|
return 'default', key
|
|
else:
|
|
return tuple(key.split('|', 1))
|
|
|
|
@staticmethod
|
|
def load_peers():
|
|
"""
|
|
Load peers from FRR.
|
|
:return: set of peers, which are already installed in FRR
|
|
"""
|
|
command = ["vtysh", "-c", "show bgp vrfs json"]
|
|
ret_code, out, err = run_command(command)
|
|
if ret_code == 0:
|
|
js_vrf = json.loads(out)
|
|
vrfs = js_vrf['vrfs'].keys()
|
|
else:
|
|
log_crit("Can't read bgp vrfs: %s" % err)
|
|
raise Exception("Can't read bgp vrfs: %s" % err)
|
|
peers = set()
|
|
for vrf in vrfs:
|
|
command = ["vtysh", "-c", 'show bgp vrf %s neighbors json' % str(vrf)]
|
|
ret_code, out, err = run_command(command)
|
|
if ret_code == 0:
|
|
js_bgp = json.loads(out)
|
|
for nbr in js_bgp.keys():
|
|
peers.add((vrf, nbr))
|
|
else:
|
|
log_crit("Can't read vrf '%s' neighbors: %s" % (vrf, str(err)))
|
|
raise Exception("Can't read vrf '%s' neighbors: %s" % (vrf, str(err)))
|
|
|
|
return peers
|
|
|
|
|
|
class ZebraSetSrc(Manager):
|
|
""" This class initialize "set src" settings for zebra """
|
|
def __init__(self, common_objs, db, table):
|
|
"""
|
|
Initialize the object
|
|
:param common_objs: common object dictionary
|
|
:param db: name of the db
|
|
:param table: name of the table in the db
|
|
"""
|
|
super(ZebraSetSrc, self).__init__(
|
|
common_objs,
|
|
[],
|
|
db,
|
|
table,
|
|
)
|
|
tf = common_objs['tf']
|
|
self.zebra_set_src_template = tf.from_file("zebra/zebra.set_src.conf.j2")
|
|
self.lo_ipv4 = None
|
|
self.lo_ipv6 = None
|
|
|
|
def set_handler(self, key, data):
|
|
""" Implementation of 'SET' command for this class """
|
|
self.directory.put(self.db_name, self.table_name, key, data)
|
|
#
|
|
if key.startswith("Loopback0|") and "state" in data and data["state"] == "ok":
|
|
ip_addr_w_mask = key.replace("Loopback0|", "")
|
|
slash_pos = ip_addr_w_mask.rfind("/")
|
|
if slash_pos == -1:
|
|
log_err("Wrong Loopback0 ip address: '%s'" % ip_addr_w_mask)
|
|
return True
|
|
ip_addr = ip_addr_w_mask[:slash_pos]
|
|
try:
|
|
if TemplateFabric.is_ipv4(ip_addr) and self.lo_ipv4 is None:
|
|
self.lo_ipv4 = ip_addr
|
|
txt = self.zebra_set_src_template.render(rm_name="RM_SET_SRC", lo_ip=ip_addr, ip_proto="")
|
|
elif TemplateFabric.is_ipv6(ip_addr) and self.lo_ipv6 is None:
|
|
self.lo_ipv6 = ip_addr
|
|
txt = self.zebra_set_src_template.render(rm_name="RM_SET_SRC6", lo_ip=ip_addr, ip_proto="v6")
|
|
else:
|
|
log_err("Got ambiguous ip address '%s'" % ip_addr)
|
|
return True
|
|
except jinja2.TemplateError as e:
|
|
log_err("Error while rendering 'set src' template: %s" % str(e))
|
|
return True
|
|
if self.cfg_mgr.push(txt):
|
|
log_info("The 'set src' configuration with Loopback0 ip '%s' was pushed" % ip_addr)
|
|
else:
|
|
log_err("The 'set src' configuration with Loopback0 ip '%s' wasn't pushed" % ip_addr)
|
|
return True
|
|
|
|
def del_handler(self, key):
|
|
""" Implementation of 'DEL' command for this class """
|
|
self.directory.remove(self.db_name, self.table_name, key)
|
|
log_warn("Delete command is not supported for 'zebra set src' templates")
|
|
|
|
|
|
def wait_for_daemons(daemons, seconds):
|
|
"""
|
|
Wait until FRR daemons are ready for requests
|
|
:param daemons: list of FRR daemons to wait
|
|
:param seconds: number of seconds to wait, until raise an error
|
|
"""
|
|
stop_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds)
|
|
log_info("Start waiting for FRR daemons: %s" % str(datetime.datetime.now()))
|
|
while datetime.datetime.now() < stop_time:
|
|
ret_code, out, err = run_command(["vtysh", "-c", "show daemons"], hide_errors=True)
|
|
if ret_code == 0 and all(daemon in out for daemon in daemons):
|
|
log_info("All required daemons have connected to vtysh: %s" % str(datetime.datetime.now()))
|
|
return
|
|
else:
|
|
log_warn("Can't read daemon status from FRR: %s" % str(err))
|
|
time.sleep(0.1) # sleep 100 ms
|
|
raise RuntimeError("FRR daemons hasn't been started in %d seconds" % seconds)
|
|
|
|
|
|
def read_constants():
|
|
""" Read file with constants values from /etc/sonic/constants.yml """
|
|
with open('/etc/sonic/constants.yml') as fp:
|
|
content = yaml.load(fp)
|
|
if "constants" not in content:
|
|
log_crit("/etc/sonic/constants.yml doesn't have 'constants' key")
|
|
raise Exception("/etc/sonic/constants.yml doesn't have 'constants' key")
|
|
return content["constants"]
|
|
|
|
|
|
def main():
|
|
""" Main function """
|
|
wait_for_daemons(["bgpd", "zebra", "staticd"], seconds=20)
|
|
#
|
|
common_objs = {
|
|
'directory': Directory(),
|
|
'cfg_mgr': ConfigMgr(),
|
|
'tf': TemplateFabric(),
|
|
'constants': read_constants(),
|
|
}
|
|
managers = [
|
|
# Config DB managers
|
|
BGPDataBaseMgr(common_objs, "CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME),
|
|
BGPDataBaseMgr(common_objs, "CONFIG_DB", swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME),
|
|
# Interface managers
|
|
InterfaceMgr(common_objs, "CONFIG_DB", swsscommon.CFG_INTF_TABLE_NAME),
|
|
InterfaceMgr(common_objs, "CONFIG_DB", swsscommon.CFG_LOOPBACK_INTERFACE_TABLE_NAME),
|
|
InterfaceMgr(common_objs, "CONFIG_DB", swsscommon.CFG_VLAN_INTF_TABLE_NAME),
|
|
InterfaceMgr(common_objs, "CONFIG_DB", swsscommon.CFG_LAG_INTF_TABLE_NAME),
|
|
# State DB managers
|
|
ZebraSetSrc(common_objs, "STATE_DB", swsscommon.STATE_INTERFACE_TABLE_NAME),
|
|
# Peer Managers
|
|
BGPPeerMgrBase(common_objs, "CONFIG_DB", swsscommon.CFG_BGP_NEIGHBOR_TABLE_NAME, "general"),
|
|
BGPPeerMgrBase(common_objs, "CONFIG_DB", "BGP_MONITORS", "monitors"),
|
|
BGPPeerMgrBase(common_objs, "CONFIG_DB", "BGP_PEER_RANGE", "dynamic"),
|
|
]
|
|
runner = Runner()
|
|
for mgr in managers:
|
|
runner.add_manager(mgr)
|
|
runner.run()
|
|
|
|
|
|
def signal_handler(_, __): # signal_handler(signum, frame)
|
|
""" signal handler """
|
|
global g_run
|
|
g_run = False
|
|
|
|
|
|
if __name__ == '__main__':
|
|
rc = 0
|
|
try:
|
|
syslog.openlog('bgpcfgd')
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
main()
|
|
except KeyboardInterrupt:
|
|
log_notice("Keyboard interrupt")
|
|
except RuntimeError as exc:
|
|
log_crit(str(exc))
|
|
rc = -2
|
|
if g_debug:
|
|
raise
|
|
except Exception as exc:
|
|
log_crit("Got an exception %s: Traceback: %s" % (str(exc), traceback.format_exc()))
|
|
rc = -1
|
|
if g_debug:
|
|
raise
|
|
finally:
|
|
syslog.closelog()
|
|
try:
|
|
sys.exit(rc)
|
|
except SystemExit:
|
|
os._exit(rc)
|