Fix swsscommon psubscribe code break in frrcfgd (#13836)
Fix swsscommon psubscribe code break in frrcfgd #### Why I did it Fix frrcfgd psubscribe code break: https://github.com/sonic-net/sonic-buildimage/issues/13109 The code issue caused by API change when migrate from swsssdk to swsscommon #### How I did it Fix frrcfgd code to use swsscommon psubscribe API. #### How to verify it Pass all UT. Manually check fixed code work correctly.
This commit is contained in:
parent
165e33b4e4
commit
fabb30f2e9
@ -1445,6 +1445,7 @@ class ExtConfigDBConnector(ConfigDBConnector):
|
|||||||
def __init__(self, ns_attrs = None):
|
def __init__(self, ns_attrs = None):
|
||||||
super(ExtConfigDBConnector, self).__init__()
|
super(ExtConfigDBConnector, self).__init__()
|
||||||
self.nosort_attrs = ns_attrs if ns_attrs is not None else {}
|
self.nosort_attrs = ns_attrs if ns_attrs is not None else {}
|
||||||
|
self.__listen_thread_running = False
|
||||||
def raw_to_typed(self, raw_data, table = ''):
|
def raw_to_typed(self, raw_data, table = ''):
|
||||||
if len(raw_data) == 0:
|
if len(raw_data) == 0:
|
||||||
raw_data = None
|
raw_data = None
|
||||||
@ -1469,12 +1470,28 @@ class ExtConfigDBConnector(ConfigDBConnector):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
syslog.syslog(syslog.LOG_ERR, '[bgp cfgd] Failed handling config DB update with exception:' + str(e))
|
syslog.syslog(syslog.LOG_ERR, '[bgp cfgd] Failed handling config DB update with exception:' + str(e))
|
||||||
logging.exception(e)
|
logging.exception(e)
|
||||||
|
|
||||||
|
def listen_thread(self, timeout):
|
||||||
|
self.__listen_thread_running = True
|
||||||
|
sub_key_space = "__keyspace@{}__:*".format(self.get_dbid(self.db_name))
|
||||||
|
self.pubsub.psubscribe(sub_key_space)
|
||||||
|
while self.__listen_thread_running:
|
||||||
|
msg = self.pubsub.get_message(timeout, True)
|
||||||
|
if msg:
|
||||||
|
self.sub_msg_handler(msg)
|
||||||
|
|
||||||
|
self.pubsub.punsubscribe(sub_key_space)
|
||||||
|
|
||||||
def listen(self):
|
def listen(self):
|
||||||
"""Start listen Redis keyspace events and will trigger corresponding handlers when content of a table changes.
|
"""Start listen Redis keyspace events and will trigger corresponding handlers when content of a table changes.
|
||||||
"""
|
"""
|
||||||
self.pubsub = self.get_redis_client(self.db_name).pubsub()
|
self.pubsub = self.get_redis_client(self.db_name).pubsub()
|
||||||
self.pubsub.psubscribe(**{"__keyspace@{}__:*".format(self.get_dbid(self.db_name)): self.sub_msg_handler})
|
self.sub_thread = threading.Thread(target=self.listen_thread, args=(0.01,))
|
||||||
self.sub_thread = self.pubsub.run_in_thread(sleep_time = 0.01)
|
self.sub_thread.start()
|
||||||
|
|
||||||
|
def stop_listen(self):
|
||||||
|
self.__listen_thread_running = False
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_table_key(table, key):
|
def get_table_key(table, key):
|
||||||
return table + '&&' + key
|
return table + '&&' + key
|
||||||
@ -3774,7 +3791,7 @@ class BGPConfigDaemon:
|
|||||||
self.subscribe_all()
|
self.subscribe_all()
|
||||||
self.config_db.listen()
|
self.config_db.listen()
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.config_db.sub_thread.stop()
|
self.config_db.stop_listen()
|
||||||
if self.config_db.sub_thread.is_alive():
|
if self.config_db.sub_thread.is_alive():
|
||||||
self.config_db.sub_thread.join()
|
self.config_db.sub_thread.join()
|
||||||
|
|
||||||
|
@ -14,10 +14,10 @@ def test_contructor():
|
|||||||
for table, hdlr in daemon.table_handler_list:
|
for table, hdlr in daemon.table_handler_list:
|
||||||
daemon.config_db.subscribe.assert_any_call(table, hdlr)
|
daemon.config_db.subscribe.assert_any_call(table, hdlr)
|
||||||
daemon.config_db.pubsub.psubscribe.assert_called_once()
|
daemon.config_db.pubsub.psubscribe.assert_called_once()
|
||||||
|
assert(daemon.config_db.sub_thread.is_alive() == True)
|
||||||
daemon.stop()
|
daemon.stop()
|
||||||
daemon.config_db.sub_thread.stop.assert_called()
|
daemon.config_db.pubsub.punsubscribe.assert_called_once()
|
||||||
daemon.config_db.sub_thread.is_alive.assert_called_once()
|
assert(daemon.config_db.sub_thread.is_alive() == False)
|
||||||
daemon.config_db.sub_thread.join.assert_called_once()
|
|
||||||
|
|
||||||
class CmdMapTestInfo:
|
class CmdMapTestInfo:
|
||||||
data_buf = {}
|
data_buf = {}
|
||||||
|
Reference in New Issue
Block a user