[eventd] Fix eventd UT flakiness (#17055)
### Why I did it Fix flakiness of eventd UT - run sub after capture service starts ##### Work item tracking - Microsoft ADO **(number only)**:25650744 #### How I did it Run sub socket after capture socket is initialized #### How to verify it Pipeline
This commit is contained in:
parent
c6602c9585
commit
ff5efe8fb3
@ -29,7 +29,7 @@ endif
|
||||
-include rsyslog_plugin/subdir.mk
|
||||
-include rsyslog_plugin_tests/subdir.mk
|
||||
|
||||
all: sonic-eventd eventd-tool rsyslog-plugin
|
||||
all: sonic-eventd eventd-tests eventd-tool rsyslog-plugin rsyslog-plugin-tests
|
||||
|
||||
sonic-eventd: $(OBJS)
|
||||
@echo 'Building target: $@'
|
||||
|
@ -546,9 +546,9 @@ capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst
|
||||
switch(ctrl) {
|
||||
case INIT_CAPTURE:
|
||||
m_thr = thread(&capture_service::do_capture, this);
|
||||
for(int i=0; !m_cap_run && (i < 100); ++i) {
|
||||
for(int i=0; !m_cap_run && (i < CAPTURE_SERVICE_POLLING_RETRIES); ++i) {
|
||||
/* Wait max a second for thread to init */
|
||||
this_thread::sleep_for(chrono::milliseconds(10));
|
||||
this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION));
|
||||
}
|
||||
RET_ON_ERR(m_cap_run, "Failed to init capture");
|
||||
m_ctrl = ctrl;
|
||||
|
@ -21,6 +21,8 @@ typedef enum {
|
||||
|
||||
#define EVENTS_STATS_FIELD_NAME "value"
|
||||
#define STATS_HEARTBEAT_MIN 300
|
||||
#define CAPTURE_SERVICE_POLLING_DURATION 10
|
||||
#define CAPTURE_SERVICE_POLLING_RETRIES 100
|
||||
|
||||
/*
|
||||
* Started by eventd_service.
|
||||
|
@ -152,25 +152,23 @@ static const test_data_t ldata[] = {
|
||||
|
||||
|
||||
void run_cap(void *zctx, bool &term, string &read_source,
|
||||
int &cnt)
|
||||
int &cnt, bool &should_read_control)
|
||||
{
|
||||
void *mock_cap = zmq_socket (zctx, ZMQ_SUB);
|
||||
string source;
|
||||
internal_event_t ev_int;
|
||||
int block_ms = 200;
|
||||
int i=0;
|
||||
static int proxy_finished_init = false;
|
||||
|
||||
EXPECT_TRUE(NULL != mock_cap);
|
||||
EXPECT_EQ(0, zmq_connect(mock_cap, get_config(CAPTURE_END_KEY).c_str()));
|
||||
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0));
|
||||
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)));
|
||||
|
||||
if(!proxy_finished_init) {
|
||||
if(should_read_control) {
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init(&msg);
|
||||
EXPECT_EQ(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message
|
||||
proxy_finished_init = true;
|
||||
EXPECT_NE(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message should be read by do_capture
|
||||
}
|
||||
|
||||
while(!term) {
|
||||
@ -227,10 +225,10 @@ void run_pub(void *mock_pub, const string wr_source, internal_events_lst_t &lst)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
TEST(eventd, proxy)
|
||||
{
|
||||
printf("Proxy TEST started\n");
|
||||
bool should_read_control = false;
|
||||
bool term_sub = false;
|
||||
bool term_cap = false;
|
||||
string rd_csource, rd_source, wr_source("hello");
|
||||
@ -247,12 +245,12 @@ TEST(eventd, proxy)
|
||||
/* Starting proxy */
|
||||
EXPECT_EQ(0, pxy->init());
|
||||
|
||||
/* capture in a thread */
|
||||
thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control));
|
||||
|
||||
/* subscriber in a thread */
|
||||
thread thr(&run_sub, zctx, ref(term_sub), ref(rd_source), ref(rd_evts), ref(rd_evts_sz));
|
||||
|
||||
/* capture in a thread */
|
||||
thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz));
|
||||
|
||||
/* Init pub connection */
|
||||
void *mock_pub = init_pub(zctx);
|
||||
|
||||
@ -275,9 +273,6 @@ TEST(eventd, proxy)
|
||||
}
|
||||
this_thread::sleep_for(chrono::milliseconds(1000));
|
||||
|
||||
delete pxy;
|
||||
pxy = NULL;
|
||||
|
||||
term_sub = true;
|
||||
term_cap = true;
|
||||
|
||||
@ -287,6 +282,18 @@ TEST(eventd, proxy)
|
||||
EXPECT_EQ(rd_cevts_sz, wr_evts.size());
|
||||
|
||||
zmq_close(mock_pub);
|
||||
|
||||
/* Do control test */
|
||||
|
||||
should_read_control = true;
|
||||
|
||||
/* capture in a thread */
|
||||
thread thrcc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control));
|
||||
|
||||
delete pxy;
|
||||
pxy = NULL;
|
||||
|
||||
thrcc.join();
|
||||
zmq_ctx_term(zctx);
|
||||
|
||||
/* Provide time for async proxy removal to complete */
|
||||
@ -295,7 +302,6 @@ TEST(eventd, proxy)
|
||||
printf("eventd_proxy is tested GOOD\n");
|
||||
}
|
||||
|
||||
|
||||
TEST(eventd, capture)
|
||||
{
|
||||
printf("Capture TEST started\n");
|
||||
@ -329,9 +335,6 @@ TEST(eventd, capture)
|
||||
/* Starting proxy */
|
||||
EXPECT_EQ(0, pxy->init());
|
||||
|
||||
/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
|
||||
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));
|
||||
|
||||
/* Create capture service */
|
||||
capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance);
|
||||
|
||||
@ -341,6 +344,9 @@ TEST(eventd, capture)
|
||||
/* Initialize the capture */
|
||||
EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE));
|
||||
|
||||
/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
|
||||
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));
|
||||
|
||||
EXPECT_TRUE(init_cache > 1);
|
||||
EXPECT_TRUE((cache_max+3) < (int)ARRAY_SIZE(ldata));
|
||||
|
||||
@ -473,9 +479,6 @@ TEST(eventd, captureCacheMax)
|
||||
/* Starting proxy */
|
||||
EXPECT_EQ(0, pxy->init());
|
||||
|
||||
/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
|
||||
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));
|
||||
|
||||
/* Create capture service */
|
||||
capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance);
|
||||
|
||||
@ -484,6 +487,9 @@ TEST(eventd, captureCacheMax)
|
||||
|
||||
EXPECT_TRUE(init_cache > 1);
|
||||
|
||||
/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
|
||||
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));
|
||||
|
||||
/* Collect few serailized strings of events for startup cache */
|
||||
for(int i=0; i < init_cache; ++i) {
|
||||
internal_event_t ev(create_ev(ldata[i]));
|
||||
@ -595,6 +601,7 @@ TEST(eventd, service)
|
||||
}
|
||||
|
||||
thread thread_service(&run_eventd_service);
|
||||
this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION * CAPTURE_SERVICE_POLLING_RETRIES));
|
||||
|
||||
/* Need client side service to interact with server side */
|
||||
EXPECT_EQ(0, service.init_client(zctx));
|
||||
@ -610,7 +617,7 @@ TEST(eventd, service)
|
||||
string wr_source("hello");
|
||||
|
||||
/* Test service startup caching */
|
||||
event_serialized_lst_t evts_start, evts_read;
|
||||
event_serialized_lst_t evts_start, evts_read, polled_events;
|
||||
|
||||
for(int i=0; i<wr_sz; ++i) {
|
||||
string evt_str;
|
||||
@ -624,15 +631,32 @@ TEST(eventd, service)
|
||||
/* Publish events. */
|
||||
run_pub(mock_pub, wr_source, wr_evts);
|
||||
|
||||
/* Published events must have been captured. Give a pause, to ensure sent. */
|
||||
this_thread::sleep_for(chrono::milliseconds(200));
|
||||
int max_polling_duration = 2000;
|
||||
int polling_interval = 100;
|
||||
auto poll_start_ts = chrono::steady_clock::now();
|
||||
|
||||
while(true) {
|
||||
auto current_ts = chrono::steady_clock::now();
|
||||
if(chrono::duration_cast<chrono::milliseconds>(current_ts - poll_start_ts).count() >= max_polling_duration) {
|
||||
break;
|
||||
}
|
||||
event_serialized_lst_t read_events;
|
||||
service.cache_read(read_events);
|
||||
polled_events.insert(polled_events.end(), read_events.begin(), read_events.end());
|
||||
if (!read_events.empty()) {
|
||||
break;
|
||||
}
|
||||
this_thread::sleep_for(chrono::milliseconds(polling_interval));
|
||||
}
|
||||
|
||||
EXPECT_EQ(0, service.cache_stop());
|
||||
|
||||
/* Read the cache; expect wr_sz events */
|
||||
/* Read remaining events in cache, if any */
|
||||
EXPECT_EQ(0, service.cache_read(evts_read));
|
||||
|
||||
EXPECT_EQ(evts_read, evts_start);
|
||||
polled_events.insert(polled_events.end(), evts_read.begin(), evts_read.end());
|
||||
|
||||
EXPECT_EQ(polled_events, evts_start);
|
||||
|
||||
zmq_close(mock_pub);
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include "gtest/gtest.h"
|
||||
#include "dbconnector.h"
|
||||
#include <iostream>
|
||||
#include <stdexcept>
|
||||
|
||||
using namespace std;
|
||||
using namespace swss;
|
||||
@ -20,57 +21,34 @@ public:
|
||||
// Override this to define how to set up the environment
|
||||
void SetUp() override {
|
||||
// by default , init should be false
|
||||
cout<<"Default : isInit = "<<SonicDBConfig::isInit()<<endl;
|
||||
cout << "Default : isInit = " << SonicDBConfig::isInit() << endl;
|
||||
EXPECT_FALSE(SonicDBConfig::isInit());
|
||||
|
||||
// load nonexisting file, should throw exception with NO file existing
|
||||
try
|
||||
{
|
||||
cout<<"INIT: loading nonexisting db config file"<<endl;
|
||||
SonicDBConfig::initialize(nonexisting_file);
|
||||
}
|
||||
catch (exception &e)
|
||||
{
|
||||
EXPECT_TRUE(strstr(e.what(), "Sonic database config file doesn't exist"));
|
||||
}
|
||||
EXPECT_THROW(SonicDBConfig::initialize(nonexisting_file), runtime_error);
|
||||
|
||||
EXPECT_FALSE(SonicDBConfig::isInit());
|
||||
|
||||
// load local config file, init should be true
|
||||
SonicDBConfig::initialize(existing_file);
|
||||
cout<<"INIT: load local db config file, isInit = "<<SonicDBConfig::isInit()<<endl;
|
||||
cout << "INIT: load local db config file, isInit = " << SonicDBConfig::isInit() << endl;
|
||||
EXPECT_TRUE(SonicDBConfig::isInit());
|
||||
|
||||
// Test the database_global.json file
|
||||
// by default , global_init should be false
|
||||
cout<<"Default : isGlobalInit = "<<SonicDBConfig::isGlobalInit()<<endl;
|
||||
cout << "Default : isGlobalInit = " << SonicDBConfig::isGlobalInit() << endl;
|
||||
EXPECT_FALSE(SonicDBConfig::isGlobalInit());
|
||||
|
||||
// Call an API which actually needs the data populated by SonicDBConfig::initializeGlobalConfig
|
||||
try
|
||||
{
|
||||
cout<<"INIT: Invoking SonicDBConfig::getDbId(APPL_DB, asic0)"<<endl;
|
||||
SonicDBConfig::getDbId(TEST_DB, TEST_NAMESPACE);
|
||||
}
|
||||
catch (exception &e)
|
||||
{
|
||||
EXPECT_TRUE(strstr(e.what(), "Initialize global DB config using API SonicDBConfig::initializeGlobalConfig"));
|
||||
}
|
||||
EXPECT_THROW(SonicDBConfig::getDbId(TEST_DB, TEST_NAMESPACE), runtime_error);
|
||||
|
||||
// load local global file, init should be true
|
||||
SonicDBConfig::initializeGlobalConfig(global_existing_file);
|
||||
cout<<"INIT: load global db config file, isInit = "<<SonicDBConfig::isGlobalInit()<<endl;
|
||||
cout << "INIT: load global db config file, isInit = " << SonicDBConfig::isGlobalInit() << endl;
|
||||
EXPECT_TRUE(SonicDBConfig::isGlobalInit());
|
||||
|
||||
// Call an API with wrong namespace passed
|
||||
try
|
||||
{
|
||||
cout<<"INIT: Invoking SonicDBConfig::getDbId(APPL_DB, invalid)"<<endl;
|
||||
SonicDBConfig::getDbId(TEST_DB, INVALID_NAMESPACE);
|
||||
}
|
||||
catch (exception &e)
|
||||
{
|
||||
EXPECT_TRUE(strstr(e.what(), "Namespace invalid is not a valid namespace name in config file"));
|
||||
}
|
||||
cout << "INIT: Invoking SonicDBConfig::getDbId(APPL_DB, invalid)" << endl;
|
||||
EXPECT_THROW(SonicDBConfig::getDbId(TEST_DB, INVALID_NAMESPACE), out_of_range);
|
||||
|
||||
// Get this info handy
|
||||
try
|
||||
|
Loading…
Reference in New Issue
Block a user