diff --git a/dockers/docker-sflow/port_index_mapper.py b/dockers/docker-sflow/port_index_mapper.py index 24f078b20f..3f8a2819a8 100755 --- a/dockers/docker-sflow/port_index_mapper.py +++ b/dockers/docker-sflow/port_index_mapper.py @@ -1,114 +1,119 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 -""" port_index_mapper - A mapper service that watches for NetLink NEWLINK and DELLINKs - to construct a PORT_INDEX_TABLE in state DB which includes the - interface name, the interface index and the ifindex. - - Note : Currently supports only interfaces supported by port_util. -""" - -import os -import sys -import syslog import signal +import sys import traceback -from pyroute2 import IPDB -from pyroute2.iproute import RTM_NEWLINK, RTM_DELLINK +from sonic_py_common.logger import Logger +from socket import if_nametoindex from swsssdk import SonicV2Connector, port_util +from swsscommon import swsscommon -PORT_INDEX_TABLE_NAME = 'PORT_INDEX_TABLE' SYSLOG_IDENTIFIER = 'port_index_mapper' -ipdb = None -state_db = None +# Global logger instance +logger = Logger(SYSLOG_IDENTIFIER) +logger.set_min_log_priority_info() -def set_port_index_table_entry(key, index, ifindex): - state_db.set(state_db.STATE_DB, key, 'index', index) - state_db.set(state_db.STATE_DB, key, 'ifindex', ifindex) +class PortIndexMapper(object): -def interface_callback(ipdb, nlmsg, action): - global state_db + def __init__(self): + REDIS_TIMEOUT_MS = 0 + # Update this list to support more interfaces + tbl_lst = [swsscommon.STATE_PORT_TABLE_NAME, + swsscommon.STATE_VLAN_TABLE_NAME] + self.appl_db = swsscommon.DBConnector("STATE_DB", + REDIS_TIMEOUT_MS, + True) - try: - msgtype = nlmsg['header']['type'] - if (msgtype != RTM_NEWLINK and msgtype != RTM_DELLINK): + self.state_db = SonicV2Connector(host='127.0.0.1', decode_responses=True) + self.state_db.connect(self.state_db.STATE_DB, False) + self.sel = swsscommon.Select() + self.tlbs = [swsscommon.SubscriberStateTable(self.appl_db, t) + for t in tbl_lst] + + self.cur_interfaces = {} + + for t in self.tlbs: + self.sel.addSelectable(t) + + def set_port_index_table_entry(self, key, index, ifindex): + self.state_db.set(self.state_db.STATE_DB, key, 'index', index) + self.state_db.set(self.state_db.STATE_DB, key, 'ifindex', ifindex) + + def update_db(self, ifname, op): + index = port_util.get_index_from_str(ifname) + if op == 'SET' and index is None: + return + ifindex = if_nametoindex(ifname) + if op == 'SET' and ifindex is None: return - # filter out unwanted messages - change = nlmsg['change'] - if (change != 0xFFFFFFFF): + # Check if ifname already exist or if index/ifindex changed due to + # syncd restart + if (ifname in self.cur_interfaces and + self.cur_interfaces[ifname] == (index, ifindex)): return - - attrs = nlmsg['attrs'] - for list in attrs: - if list[0] == 'IFLA_IFNAME': - ifname = list[1] + + _hash = '{}|{}'.format('PORT_INDEX_TABLE', ifname) + + if op == 'SET': + self.cur_interfaces[ifname] = (index, ifindex) + self.set_port_index_table_entry(_hash, str(index), str(ifindex)) + elif op == 'DEL': + del self.cur_interfaces[ifname] + self.state_db.delete(self.state_db.STATE_DB, _hash) + + def listen(self): + SELECT_TIMEOUT_MS = -1 # Infinite wait + + while True: + (state, c) = self.sel.select(SELECT_TIMEOUT_MS) + if state == swsscommon.Select.OBJECT: + for t in self.tlbs: + (key, op, cfvs) = t.pop() + if op == 'DEL' and key in self.cur_interfaces: + self.update_db(key, op) + elif (op == 'SET' and key != 'PortInitDone' and + key not in self.cur_interfaces): + self.update_db(key, op) + elif state == swsscomm.Select.ERROR: + logger.log_error("Receieved error from select()") break + + def populate(self): + SELECT_TIMEOUT_MS = 0 + + while True: + (state, c) = self.sel.select(SELECT_TIMEOUT_MS) + if state == swsscommon.Select.OBJECT: + for t in self.tlbs: + (key, op, cfvs) = t.pop() + if key and key != 'PortInitDone': + self.update_db(key, op) else: - return - - # Extract the port index from the interface name - index = port_util.get_index_from_str(ifname) - if index is None: - return - - _hash = '{}|{}'.format(PORT_INDEX_TABLE_NAME, ifname) - - if msgtype == RTM_NEWLINK: - set_port_index_table_entry(_hash, str(index), nlmsg['index']) - elif msgtype == RTM_DELLINK: - state_db.delete(state_db.STATE_DB, _hash) + break - except Exception, e: - t = sys.exc_info()[2] - traceback.print_tb(t) - syslog.syslog(syslog.LOG_CRIT, "%s" % str(e)) - os.kill(os.getpid(), signal.SIGTERM) - -def main(): - global state_db, ipdb - state_db = SonicV2Connector(host='127.0.0.1') - state_db.connect(state_db.STATE_DB, False) - - ipdb = IPDB() - - # Initialize the table at startup. - ifnames = ipdb.by_name.keys() - for ifname in ifnames: - index = port_util.get_index_from_str(ifname) - if index is None: - continue - ifindex = ipdb.interfaces[ifname]['index'] - _hash = '{}|{}'.format(PORT_INDEX_TABLE_NAME, ifname) - set_port_index_table_entry(_hash, str(index), str(ifindex)) - - ipdb.register_callback(interface_callback) - - signal.pause() def signal_handler(signum, frame): - syslog.syslog(syslog.LOG_NOTICE, "got signal %d" % signum) + logger.log_notice("got signal {}".format(signum)) sys.exit(0) + +def main(): + port_mapper = PortIndexMapper() + port_mapper.populate() + port_mapper.listen() + if __name__ == '__main__': rc = 0 try: - syslog.openlog(SYSLOG_IDENTIFIER) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) main() - except Exception, e: - t = sys.exc_info()[2] - traceback.print_tb(t) - syslog.syslog(syslog.LOG_CRIT, "%s" % str(e)) + except Exception as e: + tb = sys.exc_info()[2] + traceback.print_tb(tb) + logger.log_error("%s" % str(e)) rc = -1 finally: - if ipdb is not None: - ipdb.release() - else: - syslog.syslog(syslog.LOG_ERR, "ipdb undefined in signal_handler") - - syslog.closelog() sys.exit(rc) -