Add fix for deserializing XSUB/XPUB subscription message (#16598)

### Why I did it

##### Work item tracking
- Microsoft ADO **(number only)**:24851367

#### How I did it

Read subscription message when capture service starts, before reading cached events.

#### How to verify it

UT/Manual testing
This commit is contained in:
Zain Budhwani 2023-09-26 16:59:43 -07:00 committed by GitHub
parent d89dde3b6d
commit 233a772f49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 0 deletions

View File

@ -1,6 +1,7 @@
#include <thread> #include <thread>
#include "eventd.h" #include "eventd.h"
#include "dbconnector.h" #include "dbconnector.h"
#include "zmq.h"
/* /*
* There are 5 threads, including the main * There are 5 threads, including the main
@ -355,6 +356,7 @@ capture_service::do_capture()
int init_cnt; int init_cnt;
void *cap_sub_sock = NULL; void *cap_sub_sock = NULL;
counters_t total_overflow = 0; counters_t total_overflow = 0;
static bool init_done = false;
typedef enum { typedef enum {
/* /*
@ -391,6 +393,25 @@ capture_service::do_capture()
m_cap_run = true; m_cap_run = true;
if(!init_done) {
zmq_msg_t msg;
zmq_msg_init(&msg);
int rc = zmq_msg_recv(&msg, cap_sub_sock, 0);
RET_ON_ERR(rc == 1, "Failed to read subscription message when XSUB connects to XPUB");
/*
* When XSUB socket connects to XPUB, a subscription message is sent as a single byte 1.
* When capture service begins to read, the very first message that it will read is this
* control character.
*
* We will handle by reading this message and dropping it before we begin reading for
* cached events.
*
* This behavior will only happen once when XSUB connects to XPUB not everytime cache is started.
*
*/
init_done = true;
}
while (m_ctrl != START_CAPTURE) { while (m_ctrl != START_CAPTURE) {
/* Wait for capture start */ /* Wait for capture start */
this_thread::sleep_for(chrono::milliseconds(10)); this_thread::sleep_for(chrono::milliseconds(10));

View File

@ -159,12 +159,20 @@ void run_cap(void *zctx, bool &term, string &read_source,
internal_event_t ev_int; internal_event_t ev_int;
int block_ms = 200; int block_ms = 200;
int i=0; int i=0;
static int proxy_finished_init = false;
EXPECT_TRUE(NULL != mock_cap); EXPECT_TRUE(NULL != mock_cap);
EXPECT_EQ(0, zmq_connect(mock_cap, get_config(CAPTURE_END_KEY).c_str())); EXPECT_EQ(0, zmq_connect(mock_cap, get_config(CAPTURE_END_KEY).c_str()));
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0)); EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0));
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms))); EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)));
if(!proxy_finished_init) {
zmq_msg_t msg;
zmq_msg_init(&msg);
EXPECT_EQ(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message
proxy_finished_init = true;
}
while(!term) { while(!term) {
string source; string source;
internal_event_t ev_int; internal_event_t ev_int;