[eventd] Fix eventd UT flakiness (#17055)

### Why I did it

Fix flakiness of eventd UT - run sub after capture service starts

##### Work item tracking
- Microsoft ADO **(number only)**:25650744

#### How I did it

Run sub socket after capture socket is initialized

#### How to verify it

Pipeline
This commit is contained in:
Zain Budhwani 2024-02-12 21:52:38 -08:00 committed by mssonicbld
parent 23d5c5af0c
commit 2718ecf1f4
5 changed files with 63 additions and 59 deletions

View File

@ -29,7 +29,7 @@ endif
-include rsyslog_plugin/subdir.mk -include rsyslog_plugin/subdir.mk
-include rsyslog_plugin_tests/subdir.mk -include rsyslog_plugin_tests/subdir.mk
all: sonic-eventd eventd-tool rsyslog-plugin all: sonic-eventd eventd-tests eventd-tool rsyslog-plugin rsyslog-plugin-tests
sonic-eventd: $(OBJS) sonic-eventd: $(OBJS)
@echo 'Building target: $@' @echo 'Building target: $@'

View File

@ -546,9 +546,9 @@ capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst
switch(ctrl) { switch(ctrl) {
case INIT_CAPTURE: case INIT_CAPTURE:
m_thr = thread(&capture_service::do_capture, this); m_thr = thread(&capture_service::do_capture, this);
for(int i=0; !m_cap_run && (i < 100); ++i) { for(int i=0; !m_cap_run && (i < CAPTURE_SERVICE_POLLING_RETRIES); ++i) {
/* Wait max a second for thread to init */ /* Wait max a second for thread to init */
this_thread::sleep_for(chrono::milliseconds(10)); this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION));
} }
RET_ON_ERR(m_cap_run, "Failed to init capture"); RET_ON_ERR(m_cap_run, "Failed to init capture");
m_ctrl = ctrl; m_ctrl = ctrl;

View File

@ -21,6 +21,8 @@ typedef enum {
#define EVENTS_STATS_FIELD_NAME "value" #define EVENTS_STATS_FIELD_NAME "value"
#define STATS_HEARTBEAT_MIN 300 #define STATS_HEARTBEAT_MIN 300
#define CAPTURE_SERVICE_POLLING_DURATION 10
#define CAPTURE_SERVICE_POLLING_RETRIES 100
/* /*
* Started by eventd_service. * Started by eventd_service.

View File

@ -152,25 +152,23 @@ static const test_data_t ldata[] = {
void run_cap(void *zctx, bool &term, string &read_source, void run_cap(void *zctx, bool &term, string &read_source,
int &cnt) int &cnt, bool &should_read_control)
{ {
void *mock_cap = zmq_socket (zctx, ZMQ_SUB); void *mock_cap = zmq_socket (zctx, ZMQ_SUB);
string source; string source;
internal_event_t ev_int; internal_event_t ev_int;
int block_ms = 200; int block_ms = 200;
int i=0; int i=0;
static int proxy_finished_init = false;
EXPECT_TRUE(NULL != mock_cap); EXPECT_TRUE(NULL != mock_cap);
EXPECT_EQ(0, zmq_connect(mock_cap, get_config(CAPTURE_END_KEY).c_str())); EXPECT_EQ(0, zmq_connect(mock_cap, get_config(CAPTURE_END_KEY).c_str()));
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0)); EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0));
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms))); EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)));
if(!proxy_finished_init) { if(should_read_control) {
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init(&msg); zmq_msg_init(&msg);
EXPECT_EQ(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message EXPECT_NE(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message should be read by do_capture
proxy_finished_init = true;
} }
while(!term) { while(!term) {
@ -227,10 +225,10 @@ void run_pub(void *mock_pub, const string wr_source, internal_events_lst_t &lst)
} }
} }
TEST(eventd, proxy) TEST(eventd, proxy)
{ {
printf("Proxy TEST started\n"); printf("Proxy TEST started\n");
bool should_read_control = false;
bool term_sub = false; bool term_sub = false;
bool term_cap = false; bool term_cap = false;
string rd_csource, rd_source, wr_source("hello"); string rd_csource, rd_source, wr_source("hello");
@ -247,12 +245,12 @@ TEST(eventd, proxy)
/* Starting proxy */ /* Starting proxy */
EXPECT_EQ(0, pxy->init()); EXPECT_EQ(0, pxy->init());
/* capture in a thread */
thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control));
/* subscriber in a thread */ /* subscriber in a thread */
thread thr(&run_sub, zctx, ref(term_sub), ref(rd_source), ref(rd_evts), ref(rd_evts_sz)); thread thr(&run_sub, zctx, ref(term_sub), ref(rd_source), ref(rd_evts), ref(rd_evts_sz));
/* capture in a thread */
thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz));
/* Init pub connection */ /* Init pub connection */
void *mock_pub = init_pub(zctx); void *mock_pub = init_pub(zctx);
@ -275,9 +273,6 @@ TEST(eventd, proxy)
} }
this_thread::sleep_for(chrono::milliseconds(1000)); this_thread::sleep_for(chrono::milliseconds(1000));
delete pxy;
pxy = NULL;
term_sub = true; term_sub = true;
term_cap = true; term_cap = true;
@ -287,6 +282,18 @@ TEST(eventd, proxy)
EXPECT_EQ(rd_cevts_sz, wr_evts.size()); EXPECT_EQ(rd_cevts_sz, wr_evts.size());
zmq_close(mock_pub); zmq_close(mock_pub);
/* Do control test */
should_read_control = true;
/* capture in a thread */
thread thrcc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control));
delete pxy;
pxy = NULL;
thrcc.join();
zmq_ctx_term(zctx); zmq_ctx_term(zctx);
/* Provide time for async proxy removal to complete */ /* Provide time for async proxy removal to complete */
@ -295,7 +302,6 @@ TEST(eventd, proxy)
printf("eventd_proxy is tested GOOD\n"); printf("eventd_proxy is tested GOOD\n");
} }
TEST(eventd, capture) TEST(eventd, capture)
{ {
printf("Capture TEST started\n"); printf("Capture TEST started\n");
@ -329,9 +335,6 @@ TEST(eventd, capture)
/* Starting proxy */ /* Starting proxy */
EXPECT_EQ(0, pxy->init()); EXPECT_EQ(0, pxy->init());
/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));
/* Create capture service */ /* Create capture service */
capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance); capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance);
@ -341,6 +344,9 @@ TEST(eventd, capture)
/* Initialize the capture */ /* Initialize the capture */
EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE)); EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE));
/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));
EXPECT_TRUE(init_cache > 1); EXPECT_TRUE(init_cache > 1);
EXPECT_TRUE((cache_max+3) < (int)ARRAY_SIZE(ldata)); EXPECT_TRUE((cache_max+3) < (int)ARRAY_SIZE(ldata));
@ -473,9 +479,6 @@ TEST(eventd, captureCacheMax)
/* Starting proxy */ /* Starting proxy */
EXPECT_EQ(0, pxy->init()); EXPECT_EQ(0, pxy->init());
/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));
/* Create capture service */ /* Create capture service */
capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance); capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance);
@ -484,6 +487,9 @@ TEST(eventd, captureCacheMax)
EXPECT_TRUE(init_cache > 1); EXPECT_TRUE(init_cache > 1);
/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));
/* Collect few serailized strings of events for startup cache */ /* Collect few serailized strings of events for startup cache */
for(int i=0; i < init_cache; ++i) { for(int i=0; i < init_cache; ++i) {
internal_event_t ev(create_ev(ldata[i])); internal_event_t ev(create_ev(ldata[i]));
@ -595,6 +601,7 @@ TEST(eventd, service)
} }
thread thread_service(&run_eventd_service); thread thread_service(&run_eventd_service);
this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION * CAPTURE_SERVICE_POLLING_RETRIES));
/* Need client side service to interact with server side */ /* Need client side service to interact with server side */
EXPECT_EQ(0, service.init_client(zctx)); EXPECT_EQ(0, service.init_client(zctx));
@ -610,7 +617,7 @@ TEST(eventd, service)
string wr_source("hello"); string wr_source("hello");
/* Test service startup caching */ /* Test service startup caching */
event_serialized_lst_t evts_start, evts_read; event_serialized_lst_t evts_start, evts_read, polled_events;
for(int i=0; i<wr_sz; ++i) { for(int i=0; i<wr_sz; ++i) {
string evt_str; string evt_str;
@ -624,15 +631,32 @@ TEST(eventd, service)
/* Publish events. */ /* Publish events. */
run_pub(mock_pub, wr_source, wr_evts); run_pub(mock_pub, wr_source, wr_evts);
/* Published events must have been captured. Give a pause, to ensure sent. */ int max_polling_duration = 2000;
this_thread::sleep_for(chrono::milliseconds(200)); int polling_interval = 100;
auto poll_start_ts = chrono::steady_clock::now();
while(true) {
auto current_ts = chrono::steady_clock::now();
if(chrono::duration_cast<chrono::milliseconds>(current_ts - poll_start_ts).count() >= max_polling_duration) {
break;
}
event_serialized_lst_t read_events;
service.cache_read(read_events);
polled_events.insert(polled_events.end(), read_events.begin(), read_events.end());
if (!read_events.empty()) {
break;
}
this_thread::sleep_for(chrono::milliseconds(polling_interval));
}
EXPECT_EQ(0, service.cache_stop()); EXPECT_EQ(0, service.cache_stop());
/* Read the cache; expect wr_sz events */ /* Read remaining events in cache, if any */
EXPECT_EQ(0, service.cache_read(evts_read)); EXPECT_EQ(0, service.cache_read(evts_read));
EXPECT_EQ(evts_read, evts_start); polled_events.insert(polled_events.end(), evts_read.begin(), evts_read.end());
EXPECT_EQ(polled_events, evts_start);
zmq_close(mock_pub); zmq_close(mock_pub);
} }

View File

@ -1,6 +1,7 @@
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "dbconnector.h" #include "dbconnector.h"
#include <iostream> #include <iostream>
#include <stdexcept>
using namespace std; using namespace std;
using namespace swss; using namespace swss;
@ -20,57 +21,34 @@ public:
// Override this to define how to set up the environment // Override this to define how to set up the environment
void SetUp() override { void SetUp() override {
// by default , init should be false // by default , init should be false
cout<<"Default : isInit = "<<SonicDBConfig::isInit()<<endl; cout << "Default : isInit = " << SonicDBConfig::isInit() << endl;
EXPECT_FALSE(SonicDBConfig::isInit()); EXPECT_FALSE(SonicDBConfig::isInit());
// load nonexisting file, should throw exception with NO file existing EXPECT_THROW(SonicDBConfig::initialize(nonexisting_file), runtime_error);
try
{
cout<<"INIT: loading nonexisting db config file"<<endl;
SonicDBConfig::initialize(nonexisting_file);
}
catch (exception &e)
{
EXPECT_TRUE(strstr(e.what(), "Sonic database config file doesn't exist"));
}
EXPECT_FALSE(SonicDBConfig::isInit()); EXPECT_FALSE(SonicDBConfig::isInit());
// load local config file, init should be true // load local config file, init should be true
SonicDBConfig::initialize(existing_file); SonicDBConfig::initialize(existing_file);
cout<<"INIT: load local db config file, isInit = "<<SonicDBConfig::isInit()<<endl; cout << "INIT: load local db config file, isInit = " << SonicDBConfig::isInit() << endl;
EXPECT_TRUE(SonicDBConfig::isInit()); EXPECT_TRUE(SonicDBConfig::isInit());
// Test the database_global.json file // Test the database_global.json file
// by default , global_init should be false // by default , global_init should be false
cout<<"Default : isGlobalInit = "<<SonicDBConfig::isGlobalInit()<<endl; cout << "Default : isGlobalInit = " << SonicDBConfig::isGlobalInit() << endl;
EXPECT_FALSE(SonicDBConfig::isGlobalInit()); EXPECT_FALSE(SonicDBConfig::isGlobalInit());
// Call an API which actually needs the data populated by SonicDBConfig::initializeGlobalConfig // Call an API which actually needs the data populated by SonicDBConfig::initializeGlobalConfig
try EXPECT_THROW(SonicDBConfig::getDbId(TEST_DB, TEST_NAMESPACE), runtime_error);
{
cout<<"INIT: Invoking SonicDBConfig::getDbId(APPL_DB, asic0)"<<endl;
SonicDBConfig::getDbId(TEST_DB, TEST_NAMESPACE);
}
catch (exception &e)
{
EXPECT_TRUE(strstr(e.what(), "Initialize global DB config using API SonicDBConfig::initializeGlobalConfig"));
}
// load local global file, init should be true // load local global file, init should be true
SonicDBConfig::initializeGlobalConfig(global_existing_file); SonicDBConfig::initializeGlobalConfig(global_existing_file);
cout<<"INIT: load global db config file, isInit = "<<SonicDBConfig::isGlobalInit()<<endl; cout << "INIT: load global db config file, isInit = " << SonicDBConfig::isGlobalInit() << endl;
EXPECT_TRUE(SonicDBConfig::isGlobalInit()); EXPECT_TRUE(SonicDBConfig::isGlobalInit());
// Call an API with wrong namespace passed // Call an API with wrong namespace passed
try cout << "INIT: Invoking SonicDBConfig::getDbId(APPL_DB, invalid)" << endl;
{ EXPECT_THROW(SonicDBConfig::getDbId(TEST_DB, INVALID_NAMESPACE), out_of_range);
cout<<"INIT: Invoking SonicDBConfig::getDbId(APPL_DB, invalid)"<<endl;
SonicDBConfig::getDbId(TEST_DB, INVALID_NAMESPACE);
}
catch (exception &e)
{
EXPECT_TRUE(strstr(e.what(), "Namespace invalid is not a valid namespace name in config file"));
}
// Get this info handy // Get this info handy
try try