[docker-fpm-frr]: Refactor bgpcfgd (#3789)

This commit is contained in:
pavel-shirshov 2019-11-22 11:07:36 -08:00 committed by GitHub
parent a5423357d5
commit a73eb66546
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -8,11 +8,11 @@ import syslog
import signal import signal
import traceback import traceback
import os import os
import shutil
import tempfile import tempfile
import json import json
from collections import defaultdict from collections import defaultdict
from pprint import pprint from pprint import pprint
from pprint import pformat
import jinja2 import jinja2
import netaddr import netaddr
@ -34,6 +34,7 @@ def run_command(command, shell=False):
return p.returncode, stdout, stderr return p.returncode, stdout, stderr
class TemplateFabric(object): class TemplateFabric(object):
def __init__(self): def __init__(self):
j2_template_paths = ['/usr/share/sonic/templates'] j2_template_paths = ['/usr/share/sonic/templates']
@ -76,130 +77,18 @@ class TemplateFabric(object):
return addr.version == 6 return addr.version == 6
class BGPConfigManager(object):
def __init__(self, daemon):
self.bgp_asn = None
self.meta = None
self.neig_meta = {}
self.bgp_messages = []
self.peers = self.load_peers() # we can have bgp monitors peers here. it could be fixed by adding support for it here
fabric = TemplateFabric()
self.bgp_peer_add_template = fabric.from_file('bgpd.peer.conf.j2')
self.bgp_peer_del_template = fabric.from_string('no neighbor {{ neighbor_addr }}')
self.bgp_peer_shutdown = fabric.from_string('neighbor {{ neighbor_addr }} shutdown')
self.bgp_peer_no_shutdown = fabric.from_string('no neighbor {{ neighbor_addr }} shutdown')
daemon.add_manager(swsscommon.CONFIG_DB, swsscommon.CFG_DEVICE_METADATA_TABLE_NAME, self.__metadata_handler)
daemon.add_manager(swsscommon.CONFIG_DB, swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME, self.__neighbor_metadata_handler)
daemon.add_manager(swsscommon.CONFIG_DB, swsscommon.CFG_BGP_NEIGHBOR_TABLE_NAME, self.__bgp_handler)
def load_peers(self):
peers = set()
command = ["vtysh", "-c", "show bgp neighbors json"]
rc, out, err = run_command(command)
if rc == 0:
js_bgp = json.loads(out)
peers = set(js_bgp.keys())
return peers
def __metadata_handler(self, key, op, data):
if key != "localhost" \
or "bgp_asn" not in data \
or self.bgp_asn == data["bgp_asn"]:
return
# TODO add ASN update commands
self.meta = { 'localhost': data }
self.bgp_asn = data["bgp_asn"]
self.__update_bgp()
def __neighbor_metadata_handler(self, key, op, data):
if op == swsscommon.SET_COMMAND:
self.neig_meta[key] = data
elif op == swsscommon.DEL_COMMAND:
if key in self.neig_meta:
del self.neig_meta[key]
else:
syslog.syslog(syslog.LOG_ERR,"Can't remove key '%s' from neighbor metadata handler. The key doesn't exist" % key)
else:
syslog.syslog(syslog.LOG_ERR,"Wrong operation '%s' for neighbor metadata handler" % op)
self.__update_bgp()
def __update_bgp(self):
cmds = []
new_bgp_messages = []
for key, op, data in self.bgp_messages:
if op == swsscommon.SET_COMMAND:
if key not in self.peers:
if 'name' in data and data['name'] not in self.neig_meta:
# DEVICE_NEIGHBOR_METADATA should be populated before the rendering
new_bgp_messages.append((key, op, data))
continue
try:
txt = self.bgp_peer_add_template.render(DEVICE_METADATA=self.meta, DEVICE_NEIGHBOR_METADATA=self.neig_meta, neighbor_addr=key, bgp_session=data)
cmds.append(txt)
except:
syslog.syslog(syslog.LOG_ERR, 'Peer {}. Error in rendering the template for "SET" command {}'.format(key, data))
else:
syslog.syslog(syslog.LOG_INFO, 'Peer {} added with attributes {}'.format(key, data))
self.peers.add(key)
else:
# when the peer is already configured we support "shutdown/no shutdown"
# commands for the peers only
if "admin_status" in data:
if data['admin_status'] == 'up':
cmds.append(self.bgp_peer_no_shutdown.render(neighbor_addr=key))
syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "up"'.format(key))
elif data['admin_status'] == 'down':
cmds.append(self.bgp_peer_shutdown.render(neighbor_addr=key))
syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "down"'.format(key))
else:
syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. has wrong attribute value attr['admin_status'] = '{}'".format(key, data['admin_status']))
else:
syslog.syslog(syslog.LOG_INFO, "Peer {}: Can't update the peer. No 'admin_status' attribute in the request".format(key))
elif op == swsscommon.DEL_COMMAND:
if key in self.peers:
cmds.append(self.bgp_peer_del_template.render(neighbor_addr=key))
syslog.syslog(syslog.LOG_INFO, 'Peer {} has been removed'.format(key))
self.peers.remove(key)
else:
syslog.syslog(syslog.LOG_WARNING, 'Peer {} is not found'.format(key))
self.bgp_messages = new_bgp_messages
if len(cmds) == 0:
return
fd, tmp_filename = tempfile.mkstemp(dir='/tmp')
os.close(fd)
with open (tmp_filename, 'w') as fp:
fp.write('router bgp %s\n' % self.bgp_asn)
for cmd in cmds:
fp.write("%s\n" % cmd)
command = ["vtysh", "-f", tmp_filename]
run_command(command) #FIXME
os.remove(tmp_filename)
def __bgp_handler(self, key, op, data):
self.bgp_messages.append((key, op, data))
# If ASN is not set, we just cache this message until the ASN is set.
if self.bgp_asn is not None:
self.__update_bgp()
class Daemon(object): class Daemon(object):
SELECT_TIMEOUT = 1000 SELECT_TIMEOUT = 1000
DATABASE_LIST = [ swsscommon.CONFIG_DB ]
def __init__(self): def __init__(self):
self.db_connectors = { db : swsscommon.DBConnector(db, swsscommon.DBConnector.DEFAULT_UNIXSOCKET, 0) for db in Daemon.DATABASE_LIST } self.db_connectors = {}
self.selector = swsscommon.Select() self.selector = swsscommon.Select()
self.callbacks = defaultdict(lambda : defaultdict(list)) # db -> table -> [] self.callbacks = defaultdict(lambda : defaultdict(list)) # db -> table -> []
self.subscribers = set() self.subscribers = set()
def add_manager(self, db, table_name, callback): def add_manager(self, db, table_name, callback):
if db not in Daemon.DATABASE_LIST: if db not in self.db_connectors:
raise ValueError("database {} isn't supported. Supported '{}' only.".format(db, ",".join(Daemon.DATABASE_LIST))) self.db_connectors[db] = swsscommon.DBConnector(db, swsscommon.DBConnector.DEFAULT_UNIXSOCKET, 0)
if table_name not in self.callbacks[db]: if table_name not in self.callbacks[db]:
conn = self.db_connectors[db] conn = self.db_connectors[db]
@ -226,6 +115,260 @@ class Daemon(object):
callback(key, op, dict(fvs)) callback(key, op, dict(fvs))
class Directory(object):
def __init__(self):
self.data = defaultdict(dict)
self.notify = defaultdict(lambda: defaultdict(list))
def path_exist(self, slot, path):
if slot not in self.data:
return False
elif path == '':
return True
d = self.data[slot]
for p in path.split("/"):
if p not in d:
return False
d = d[p]
return True
def get_path(self, slot, path):
if slot not in self.data:
return None
elif path == '':
return self.data[slot]
d = self.data[slot]
for p in path.split("/"):
if p not in d:
return None
d = d[p]
return d
def put(self, slot, key, value):
self.data[slot][key] = value
if slot in self.notify:
for path in self.notify[slot].keys():
if self.path_exist(slot, path):
for handler in self.notify[slot][path]:
handler()
def get(self, slot, key):
return self.data[slot][key]
def remove(self, slot, key):
if slot in self.data:
if key in self.data[slot]:
del self.data[slot][key]
else:
syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove key '%s' from slot '%s'. The key doesn't exist" % (key, slot))
else:
syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove key '%s' from slot '%s'. The slot doesn't exist" % (key, slot))
def remove_slot(self, slot, key):
if slot in self.data:
del self.data[slot]
else:
syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove slot '%s'. The slot doesn't exist" % slot)
def get_slot(self, slot):
return self.data[slot]
def available_slot(self, slot):
return slot in self.data
def available_deps(self, deps):
res = True
for slot, path in deps:
res = res and self.path_exist(slot, path)
return res
def subscribe(self, deps, handler):
for slot, path in deps:
self.notify[slot][path].append(handler)
class Manager(object):
def __init__(self, daemon, directory, deps, database, table_name):
self.directory = directory
self.deps = deps
self.set_queue = []
daemon.add_manager(database, table_name, self.handler)
directory.subscribe(deps, self.on_deps_change)
def handler(self, key, op, data):
if op == swsscommon.SET_COMMAND:
if self.directory.available_deps(self.deps):
res = self.set_handler(key, data)
if not res:
self.set_queue.append((key, data))
else:
self.set_queue.append((key, data))
elif op == swsscommon.DEL_COMMAND:
self.del_handler(key)
else:
syslog.syslog(syslog.LOG_ERR, 'Invalid operation "%s" for key "%s"' % (op, key))
def on_deps_change(self):
new_queue = []
for key, data in self.set_queue:
res = self.set_handler(key, data)
if not res:
new_queue.append((key, data))
self.set_queue = new_queue
def set_handler(self, key, data):
syslog.syslog(syslog.LOG_ERR, "%s wasn't implemented for %s" % (self.__name__, self.__class__))
def del_handler(self, key):
syslog.syslog(syslog.LOG_ERR, "%s wasn't implemented for %s" % (self.__name__, self.__class__))
class BGPDeviceMetaMgr(Manager):
def __init__(self, daemon, directory):
super(BGPDeviceMetaMgr, self).__init__(
daemon,
directory,
[],
swsscommon.CONFIG_DB,
swsscommon.CFG_DEVICE_METADATA_TABLE_NAME
)
def set_handler(self, key, data):
if key != "localhost" or "bgp_asn" not in data:
return
if self.directory.path_exist("meta", "localhost/bgp_asn"):
bgp_asn = self.directory.get_path("meta", "localhost/bgp_asn")
if bgp_asn == data["bgp_asn"]:
return
self.directory.put("meta", key, data)
return True
def del_handler(self, key):
self.directory.remove("meta", key)
class BGPNeighborMetaMgr(Manager):
def __init__(self, daemon, directory):
super(BGPNeighborMetaMgr, self).__init__(
daemon,
directory,
[],
swsscommon.CONFIG_DB,
swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME
)
def set_handler(self, key, data):
self.directory.put("neigmeta", key, data)
return True
def del_handler(self, key):
self.directory.remove("neigmeta", key)
class BGPPeerMgr(Manager):
def __init__(self, daemon, directory):
super(BGPPeerMgr, self).__init__(
daemon,
directory,
[
("meta", "localhost/bgp_asn"),
("neigmeta", ""),
],
swsscommon.CONFIG_DB,
swsscommon.CFG_BGP_NEIGHBOR_TABLE_NAME
)
self.peers = self.load_peers()
fabric = TemplateFabric()
self.templates = {
"add": fabric.from_file('bgpd.peer.conf.j2'),
"delete": fabric.from_string('no neighbor {{ neighbor_addr }}'),
"shutdown": fabric.from_string('neighbor {{ neighbor_addr }} shutdown'),
"no shutdown": fabric.from_string('no neighbor {{ neighbor_addr }} shutdown'),
}
def set_handler(self, key, data):
if key not in self.peers:
cmd = None
neigmeta = self.directory.get_slot("neigmeta")
if 'name' in data and data["name"] not in neigmeta:
return False
try:
cmd = self.templates["add"].render(
DEVICE_METADATA=self.directory.get_slot("meta"),
DEVICE_NEIGHBOR_METADATA=neigmeta,
neighbor_addr=key,
bgp_session=data
)
except:
syslog.syslog(syslog.LOG_ERR, 'Peer {}. Error in rendering the template for "SET" command {}'.format(key, data))
return True
if cmd is not None:
rc = self.apply_op(cmd)
if rc:
self.peers.add(key)
syslog.syslog(syslog.LOG_INFO, 'Peer {} added with attributes {}'.format(key, data))
else:
syslog.syslog(syslog.LOG_ERR, "Peer {} wasn't added.".format(key))
else:
# when the peer is already configured we support "shutdown/no shutdown"
# commands for the peers only
if "admin_status" in data:
if data['admin_status'] == 'up':
rc = self.apply_op(self.templates["no shutdown"].render(neighbor_addr=key))
if rc:
syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "up"'.format(key))
else:
syslog.syslog(syslog.LOG_ERR, "Peer {} admin state wasn't set to 'up'.".format(key))
elif data['admin_status'] == 'down':
rc = self.apply_op(self.templates["shutdown"].render(neighbor_addr=key))
if rc:
syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "down"'.format(key))
else:
syslog.syslog(syslog.LOG_ERR, "Peer {} admin state wasn't set to 'down'.".format(key))
else:
syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. has wrong attribute value attr['admin_status'] = '{}'".format(key, data['admin_status']))
else:
syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. No 'admin_status' attribute in the request".format(key))
return True
def del_handler(self, key):
if key not in self.peers:
syslog.syslog(syslog.LOG_WARNING, 'Peer {} has not been found'.format(key))
return
cmd = self.templates["delete"].render(neighbor_addr=key)
rc = self.apply_op(cmd)
if rc:
syslog.syslog(syslog.LOG_INFO, 'Peer {} has been removed'.format(key))
self.peers.remove(key)
else:
syslog.syslog(syslog.LOG_ERR, "Peer {} hasn't been removed".format(key))
def apply_op(self, cmd):
bgp_asn = self.directory.get_slot("meta")["localhost"]["bgp_asn"]
fd, tmp_filename = tempfile.mkstemp(dir='/tmp')
os.close(fd)
with open(tmp_filename, 'w') as fp:
fp.write('router bgp %s\n' % bgp_asn)
fp.write("%s\n" % cmd)
command = ["vtysh", "-f", tmp_filename]
rc, _, _ = run_command(command)
os.remove(tmp_filename)
return rc == 0
@staticmethod
def load_peers():
peers = set()
command = ["vtysh", "-c", "show bgp neighbors json"]
rc, out, err = run_command(command)
if rc == 0:
js_bgp = json.loads(out)
peers = set(js_bgp.keys())
return peers
def wait_for_bgpd(): def wait_for_bgpd():
# wait for 20 seconds # wait for 20 seconds
stop_time = datetime.datetime.now() + datetime.timedelta(seconds=20) stop_time = datetime.datetime.now() + datetime.timedelta(seconds=20)
@ -240,9 +383,15 @@ def wait_for_bgpd():
def main(): def main():
managers = [
BGPDeviceMetaMgr,
BGPNeighborMetaMgr,
BGPPeerMgr,
]
wait_for_bgpd() wait_for_bgpd()
daemon = Daemon() daemon = Daemon()
bgp_manager = BGPConfigManager(daemon) directory = Directory()
manager_instanses = [ manager(daemon, directory) for manager in managers ]
daemon.run() daemon.run()