sonic-buildimage/dockers/docker-fpm-frr/bgpcfgd
Dong Zhang a2656416fe [MultiDB] (./dockers dir) : replace redis-cli with sonic-db-cli and use new DBConnector (#3923)
* [MultiDB] (./dockers dirs): replace redis-cli with sonic-db-cli and use new DBConnector

* remove unnecessary quota

* update typo
2020-02-03 15:33:30 -08:00

445 lines
15 KiB
Python
Executable File

#!/usr/bin/env python
import sys
import subprocess
import datetime
import time
import syslog
import signal
import traceback
import os
import tempfile
import json
from collections import defaultdict
from pprint import pprint
from pprint import pformat
import jinja2
import netaddr
from swsscommon import swsscommon
g_run = True
g_debug = False
def run_command(command, shell=False):
str_cmd = " ".join(command)
if g_debug:
syslog.syslog(syslog.LOG_DEBUG, "execute command {}.".format(str_cmd))
p = subprocess.Popen(command, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
syslog.syslog(syslog.LOG_ERR, 'command execution returned {}. Command: "{}", stdout: "{}", stderr: "{}"'.format(p.returncode, str_cmd, stdout, stderr))
return p.returncode, stdout, stderr
class TemplateFabric(object):
def __init__(self):
j2_template_paths = ['/usr/share/sonic/templates']
j2_loader = jinja2.FileSystemLoader(j2_template_paths)
j2_env = jinja2.Environment(loader=j2_loader, trim_blocks=True)
j2_env.filters['ipv4'] = self.is_ipv4
j2_env.filters['ipv6'] = self.is_ipv6
self.env = j2_env
def from_file(self, filename):
return self.env.get_template(filename)
def from_string(self, tmpl):
return self.env.from_string(tmpl)
@staticmethod
def is_ipv4(value):
if not value:
return False
if isinstance(value, netaddr.IPNetwork):
addr = value
else:
try:
addr = netaddr.IPNetwork(str(value))
except:
return False
return addr.version == 4
@staticmethod
def is_ipv6(value):
if not value:
return False
if isinstance(value, netaddr.IPNetwork):
addr = value
else:
try:
addr = netaddr.IPNetwork(str(value))
except:
return False
return addr.version == 6
class Daemon(object):
SELECT_TIMEOUT = 1000
def __init__(self):
self.db_connectors = {}
self.selector = swsscommon.Select()
self.callbacks = defaultdict(lambda : defaultdict(list)) # db -> table -> []
self.subscribers = set()
def add_manager(self, db_name, table_name, callback):
db = swsscommon.SonicDBConfig.getDbId(db_name)
if db not in self.db_connectors:
self.db_connectors[db] = swsscommon.DBConnector(db_name, 0)
if table_name not in self.callbacks[db]:
conn = self.db_connectors[db]
subscriber = swsscommon.SubscriberStateTable(conn, table_name)
self.subscribers.add(subscriber)
self.selector.addSelectable(subscriber)
self.callbacks[db][table_name].append(callback)
def run(self):
while g_run:
state, _ = self.selector.select(Daemon.SELECT_TIMEOUT)
if state == self.selector.TIMEOUT:
continue
elif state == self.selector.ERROR:
raise Exception("Received error from select")
for subscriber in self.subscribers:
key, op, fvs = subscriber.pop()
if not key:
continue
if g_debug:
syslog.syslog(syslog.LOG_DEBUG, "Received message : {}".format((key, op, fvs)))
for callback in self.callbacks[subscriber.getDbConnector().getDbId()][subscriber.getTableName()]:
callback(key, op, dict(fvs))
class Directory(object):
def __init__(self):
self.data = defaultdict(dict)
self.notify = defaultdict(lambda: defaultdict(list))
def path_traverse(self, slot, path):
if slot not in self.data:
return False, None
elif path == '':
return True, self.data[slot]
d = self.data[slot]
for p in path.split("/"):
if p not in d:
return False, None
d = d[p]
return True, d
def path_exist(self, slot, path):
return self.path_traverse(slot, path)[0]
def get_path(self, slot, path):
return self.path_traverse(slot, path)[1]
def put(self, slot, key, value):
self.data[slot][key] = value
if slot in self.notify:
for path in self.notify[slot].keys():
if self.path_exist(slot, path):
for handler in self.notify[slot][path]:
handler()
def get(self, slot, key):
return self.data[slot][key]
def remove(self, slot, key):
if slot in self.data:
if key in self.data[slot]:
del self.data[slot][key]
else:
syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove key '%s' from slot '%s'. The key doesn't exist" % (key, slot))
else:
syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove key '%s' from slot '%s'. The slot doesn't exist" % (key, slot))
def remove_slot(self, slot, key):
if slot in self.data:
del self.data[slot]
else:
syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove slot '%s'. The slot doesn't exist" % slot)
def get_slot(self, slot):
return self.data[slot]
def available_slot(self, slot):
return slot in self.data
def available_deps(self, deps):
res = True
for slot, path in deps:
res = res and self.path_exist(slot, path)
return res
def subscribe(self, deps, handler):
for slot, path in deps:
self.notify[slot][path].append(handler)
class Manager(object):
def __init__(self, daemon, directory, deps, database, table_name):
self.directory = directory
self.deps = deps
self.set_queue = []
daemon.add_manager(database, table_name, self.handler)
directory.subscribe(deps, self.on_deps_change)
def handler(self, key, op, data):
if op == swsscommon.SET_COMMAND:
if self.directory.available_deps(self.deps):
res = self.set_handler(key, data)
if not res:
self.set_queue.append((key, data))
else:
self.set_queue.append((key, data))
elif op == swsscommon.DEL_COMMAND:
self.del_handler(key)
else:
syslog.syslog(syslog.LOG_ERR, 'Invalid operation "%s" for key "%s"' % (op, key))
def on_deps_change(self):
if not self.directory.available_deps(self.deps):
return
new_queue = []
for key, data in self.set_queue:
res = self.set_handler(key, data)
if not res:
new_queue.append((key, data))
self.set_queue = new_queue
def set_handler(self, key, data):
syslog.syslog(syslog.LOG_ERR, "%s wasn't implemented for %s" % (self.__name__, self.__class__))
def del_handler(self, key):
syslog.syslog(syslog.LOG_ERR, "%s wasn't implemented for %s" % (self.__name__, self.__class__))
class BGPDeviceMetaMgr(Manager):
def __init__(self, daemon, directory):
super(BGPDeviceMetaMgr, self).__init__(
daemon,
directory,
[],
"CONFIG_DB",
swsscommon.CFG_DEVICE_METADATA_TABLE_NAME
)
def set_handler(self, key, data):
if key != "localhost" or "bgp_asn" not in data:
return
if self.directory.path_exist("meta", "localhost/bgp_asn"):
bgp_asn = self.directory.get_path("meta", "localhost/bgp_asn")
if bgp_asn == data["bgp_asn"]:
return
self.directory.put("meta", key, data)
return True
def del_handler(self, key):
self.directory.remove("meta", key)
class BGPNeighborMetaMgr(Manager):
def __init__(self, daemon, directory):
super(BGPNeighborMetaMgr, self).__init__(
daemon,
directory,
[],
"CONFIG_DB",
swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME
)
def set_handler(self, key, data):
self.directory.put("neigmeta", key, data)
return True
def del_handler(self, key):
self.directory.remove("neigmeta", key)
class BGPPeerMgr(Manager):
def __init__(self, daemon, directory):
super(BGPPeerMgr, self).__init__(
daemon,
directory,
[
("meta", "localhost/bgp_asn"),
("neigmeta", ""),
],
"CONFIG_DB",
swsscommon.CFG_BGP_NEIGHBOR_TABLE_NAME
)
self.peers = self.load_peers()
fabric = TemplateFabric()
self.templates = {
"add": fabric.from_file('bgpd.peer.conf.j2'),
"delete": fabric.from_string('no neighbor {{ neighbor_addr }}'),
"shutdown": fabric.from_string('neighbor {{ neighbor_addr }} shutdown'),
"no shutdown": fabric.from_string('no neighbor {{ neighbor_addr }} shutdown'),
}
def set_handler(self, key, data):
key = self.normalize_key(key)
vrf, nbr = key.split('|', 1)
if key not in self.peers:
cmd = None
neigmeta = self.directory.get_slot("neigmeta")
if 'name' in data and data["name"] not in neigmeta:
return False
try:
cmd = self.templates["add"].render(
DEVICE_METADATA=self.directory.get_slot("meta"),
DEVICE_NEIGHBOR_METADATA=neigmeta,
neighbor_addr=nbr,
bgp_session=data
)
except:
syslog.syslog(syslog.LOG_ERR, 'Peer {}. Error in rendering the template for "SET" command {}'.format(key, data))
return True
if cmd is not None:
rc = self.apply_op(cmd, vrf)
if rc:
self.peers.add(key)
syslog.syslog(syslog.LOG_INFO, 'Peer {} added with attributes {}'.format(key, data))
else:
syslog.syslog(syslog.LOG_ERR, "Peer {} wasn't added.".format(key))
else:
# when the peer is already configured we support "shutdown/no shutdown"
# commands for the peers only
if "admin_status" in data:
if data['admin_status'] == 'up':
rc = self.apply_op(self.templates["no shutdown"].render(neighbor_addr=nbr), vrf)
if rc:
syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "up"'.format(key))
else:
syslog.syslog(syslog.LOG_ERR, "Peer {} admin state wasn't set to 'up'.".format(key))
elif data['admin_status'] == 'down':
rc = self.apply_op(self.templates["shutdown"].render(neighbor_addr=nbr), vrf)
if rc:
syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "down"'.format(key))
else:
syslog.syslog(syslog.LOG_ERR, "Peer {} admin state wasn't set to 'down'.".format(key))
else:
syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. has wrong attribute value attr['admin_status'] = '{}'".format(key, data['admin_status']))
else:
syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. No 'admin_status' attribute in the request".format(key))
return True
def del_handler(self, key):
key = self.normalize_key(key)
vrf, nbr = key.split('|', 1)
if key not in self.peers:
syslog.syslog(syslog.LOG_WARNING, 'Peer {} has not been found'.format(key))
return
cmd = self.templates["delete"].render(neighbor_addr=nbr)
rc = self.apply_op(cmd, vrf)
if rc:
syslog.syslog(syslog.LOG_INFO, 'Peer {} has been removed'.format(key))
self.peers.remove(key)
else:
syslog.syslog(syslog.LOG_ERR, "Peer {} hasn't been removed".format(key))
def apply_op(self, cmd, vrf):
bgp_asn = self.directory.get_slot("meta")["localhost"]["bgp_asn"]
fd, tmp_filename = tempfile.mkstemp(dir='/tmp')
os.close(fd)
with open(tmp_filename, 'w') as fp:
if vrf == 'default':
fp.write('router bgp %s\n' % bgp_asn)
else:
fp.write('router bgp %s vrf %s\n' % (bgp_asn, vrf))
fp.write("%s\n" % cmd)
command = ["vtysh", "-f", tmp_filename]
rc, _, _ = run_command(command)
os.remove(tmp_filename)
return rc == 0
@staticmethod
def normalize_key(key):
if '|' not in key:
return 'default|' + key
else:
return key
@staticmethod
def load_peers():
vrfs = []
command = ["vtysh", "-c", "show bgp vrfs json"]
rc, out, err = run_command(command)
if rc == 0:
js_vrf = json.loads(out)
vrfs = js_vrf['vrfs'].keys()
peers = set()
for vrf in vrfs:
command = ["vtysh", "-c", 'show bgp vrf {} neighbors json'.format(vrf)]
rc, out, err = run_command(command)
if rc == 0:
js_bgp = json.loads(out)
for nbr in js_bgp.keys():
peers.add((vrf, nbr))
return peers
def wait_for_bgpd():
# wait for 20 seconds
stop_time = datetime.datetime.now() + datetime.timedelta(seconds=20)
syslog.syslog(syslog.LOG_INFO, "Start waiting for bgpd: %s" % str(datetime.datetime.now()))
while datetime.datetime.now() < stop_time:
rc, out, err = run_command(["vtysh", "-c", "show daemons"])
if rc == 0 and "bgpd" in out:
syslog.syslog(syslog.LOG_INFO, "bgpd connected to vtysh: %s" % str(datetime.datetime.now()))
return
time.sleep(0.1) # sleep 100 ms
raise RuntimeError("bgpd hasn't been started in 20 seconds")
def main():
managers = [
BGPDeviceMetaMgr,
BGPNeighborMetaMgr,
BGPPeerMgr,
]
wait_for_bgpd()
daemon = Daemon()
directory = Directory()
manager_instanses = [ manager(daemon, directory) for manager in managers ]
daemon.run()
def signal_handler(signum, frame):
global g_run
g_run = False
if __name__ == '__main__':
rc = 0
try:
syslog.openlog('bgpcfgd')
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
main()
except KeyboardInterrupt:
syslog.syslog(syslog.LOG_NOTICE, "Keyboard interrupt")
except RuntimeError as e:
syslog.syslog(syslog.LOG_CRIT, "%s" % str(e))
rc = -2
except Exception as e:
syslog.syslog(syslog.LOG_CRIT, "Got an exception %s: Traceback: %s" % (str(e), traceback.format_exc()))
rc = -1
finally:
syslog.closelog()
try:
sys.exit(rc)
except SystemExit:
os._exit(rc)