[eventd] Add incremental polling when waiting for capture service to start (#18138)

### Why I did it

Addresses https://github.com/sonic-net/sonic-buildimage/issues/17350

### How I did it

Instead of a 1 second delay, we poll to check that the thread is available and after each poll increment the delay. There were situations where if there was less memory available, fixed polling would not be effective for starting zmq capture service. Add an incremental delay such that eventd can wait longer to start up capture service if system is too busy or overloaded, but still keep a max duration/retry limit so that we do not wait forever.

#### How to verify it
UT
This commit is contained in:
Zain Budhwani 2024-03-14 19:00:11 -07:00 committed by GitHub
parent 9a6d6137a3
commit a53cbdc782
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 28 additions and 16 deletions

View File

@ -1,4 +1,5 @@
#include <thread>
#include <memory>
#include "eventd.h"
#include "dbconnector.h"
#include "zmq.h"
@ -539,6 +540,7 @@ int
capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst)
{
int ret = -1;
int duration = CAPTURE_SERVICE_POLLING_DURATION;
/* Can go in single step only. */
RET_ON_ERR((ctrl - m_ctrl) == 1, "m_ctrl(%d)+1 < ctrl(%d)", m_ctrl, ctrl);
@ -547,8 +549,9 @@ capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst
case INIT_CAPTURE:
m_thr = thread(&capture_service::do_capture, this);
for(int i=0; !m_cap_run && (i < CAPTURE_SERVICE_POLLING_RETRIES); ++i) {
/* Wait max a second for thread to init */
this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION));
/* Poll to see if thread has been init, if so exit early. Add delay on every attempt */
this_thread::sleep_for(chrono::milliseconds(duration));
duration = min(duration + CAPTURE_SERVICE_POLLING_INCREMENT, CAPTURE_SERVICE_POLLING_MAX_DURATION);
}
RET_ON_ERR(m_cap_run, "Failed to init capture");
m_ctrl = ctrl;
@ -646,7 +649,8 @@ run_eventd_service()
event_service service;
stats_collector stats_instance;
eventd_proxy *proxy = NULL;
capture_service *capture = NULL;
unique_ptr<capture_service> capture;
bool skip_caching = false;
event_serialized_lst_t capture_fifo_events;
last_events_t capture_last_events;
@ -676,9 +680,14 @@ run_eventd_service()
* events until telemetry starts.
* Telemetry will send a stop & collect cache upon startup
*/
capture = new capture_service(zctx, cache_max, &stats_instance);
RET_ON_ERR(capture->set_control(INIT_CAPTURE) == 0, "Failed to init capture");
RET_ON_ERR(capture->set_control(START_CAPTURE) == 0, "Failed to start capture");
capture = make_unique<capture_service>(zctx, cache_max, &stats_instance);
if (capture->set_control(INIT_CAPTURE) != 0) {
SWSS_LOG_WARN("Failed to initialize capture service, so we skip caching");
skip_caching = true;
capture.reset(); // Capture service will not be available
} else {
RET_ON_ERR(capture->set_control(START_CAPTURE) == 0, "Failed to start capture");
}
this_thread::sleep_for(chrono::milliseconds(200));
RET_ON_ERR(stats_instance.is_running(), "Failed to start stats instance");
@ -694,12 +703,12 @@ run_eventd_service()
case EVENT_CACHE_INIT:
/* connect only*/
if (capture != NULL) {
delete capture;
capture.reset();
}
event_serialized_lst_t().swap(capture_fifo_events);
last_events_t().swap(capture_last_events);
capture = new capture_service(zctx, cache_max, &stats_instance);
capture = make_unique<capture_service>(zctx, cache_max, &stats_instance);
if (capture != NULL) {
resp = capture->set_control(INIT_CAPTURE);
}
@ -708,7 +717,7 @@ run_eventd_service()
case EVENT_CACHE_START:
if (capture == NULL) {
SWSS_LOG_ERROR("Cache is not initialized to start");
SWSS_LOG_WARN("Cache is not initialized to start");
resp = -1;
break;
}
@ -721,7 +730,7 @@ run_eventd_service()
case EVENT_CACHE_STOP:
if (capture == NULL) {
SWSS_LOG_ERROR("Cache is not initialized to stop");
SWSS_LOG_WARN("Cache is not initialized to stop");
resp = -1;
break;
}
@ -731,8 +740,7 @@ run_eventd_service()
resp = capture->read_cache(capture_fifo_events, capture_last_events,
overflow);
}
delete capture;
capture = NULL;
capture.reset();
/* Unpause heartbeat upon stop caching */
stats_instance.heartbeat_ctrl();
@ -740,6 +748,11 @@ run_eventd_service()
case EVENT_CACHE_READ:
if (skip_caching) {
SWSS_LOG_WARN("Capture service is unavailable, skipping cache read");
resp = -1;
break;
}
if (capture != NULL) {
SWSS_LOG_ERROR("Cache is not stopped yet.");
resp = -1;
@ -802,13 +815,10 @@ out:
if (proxy != NULL) {
delete proxy;
}
if (capture != NULL) {
delete capture;
}
if (zctx != NULL) {
zmq_ctx_term(zctx);
}
SWSS_LOG_ERROR("Eventd service exiting\n");
SWSS_LOG_INFO("Eventd service exiting\n");
}
void set_unit_testing(bool b)

View File

@ -22,6 +22,8 @@ typedef enum {
#define EVENTS_STATS_FIELD_NAME "value"
#define STATS_HEARTBEAT_MIN 300
#define CAPTURE_SERVICE_POLLING_DURATION 10
#define CAPTURE_SERVICE_POLLING_INCREMENT 10
#define CAPTURE_SERVICE_POLLING_MAX_DURATION 100
#define CAPTURE_SERVICE_POLLING_RETRIES 100
/*