Add fix for deserializing XSUB/XPUB subscription message (#16598)
### Why I did it ##### Work item tracking - Microsoft ADO **(number only)**:24851367 #### How I did it Read subscription message when capture service starts, before reading cached events. #### How to verify it UT/Manual testing
This commit is contained in:
parent
8cc1998e74
commit
d48c272677
@ -1,6 +1,7 @@
|
|||||||
#include <thread>
|
#include <thread>
|
||||||
#include "eventd.h"
|
#include "eventd.h"
|
||||||
#include "dbconnector.h"
|
#include "dbconnector.h"
|
||||||
|
#include "zmq.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* There are 5 threads, including the main
|
* There are 5 threads, including the main
|
||||||
@ -355,6 +356,7 @@ capture_service::do_capture()
|
|||||||
int init_cnt;
|
int init_cnt;
|
||||||
void *cap_sub_sock = NULL;
|
void *cap_sub_sock = NULL;
|
||||||
counters_t total_overflow = 0;
|
counters_t total_overflow = 0;
|
||||||
|
static bool init_done = false;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
/*
|
/*
|
||||||
@ -391,6 +393,25 @@ capture_service::do_capture()
|
|||||||
|
|
||||||
m_cap_run = true;
|
m_cap_run = true;
|
||||||
|
|
||||||
|
if(!init_done) {
|
||||||
|
zmq_msg_t msg;
|
||||||
|
zmq_msg_init(&msg);
|
||||||
|
int rc = zmq_msg_recv(&msg, cap_sub_sock, 0);
|
||||||
|
RET_ON_ERR(rc == 1, "Failed to read subscription message when XSUB connects to XPUB");
|
||||||
|
/*
|
||||||
|
* When XSUB socket connects to XPUB, a subscription message is sent as a single byte 1.
|
||||||
|
* When capture service begins to read, the very first message that it will read is this
|
||||||
|
* control character.
|
||||||
|
*
|
||||||
|
* We will handle by reading this message and dropping it before we begin reading for
|
||||||
|
* cached events.
|
||||||
|
*
|
||||||
|
* This behavior will only happen once when XSUB connects to XPUB not everytime cache is started.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
init_done = true;
|
||||||
|
}
|
||||||
|
|
||||||
while (m_ctrl != START_CAPTURE) {
|
while (m_ctrl != START_CAPTURE) {
|
||||||
/* Wait for capture start */
|
/* Wait for capture start */
|
||||||
this_thread::sleep_for(chrono::milliseconds(10));
|
this_thread::sleep_for(chrono::milliseconds(10));
|
||||||
|
@ -159,12 +159,20 @@ void run_cap(void *zctx, bool &term, string &read_source,
|
|||||||
internal_event_t ev_int;
|
internal_event_t ev_int;
|
||||||
int block_ms = 200;
|
int block_ms = 200;
|
||||||
int i=0;
|
int i=0;
|
||||||
|
static int proxy_finished_init = false;
|
||||||
|
|
||||||
EXPECT_TRUE(NULL != mock_cap);
|
EXPECT_TRUE(NULL != mock_cap);
|
||||||
EXPECT_EQ(0, zmq_connect(mock_cap, get_config(CAPTURE_END_KEY).c_str()));
|
EXPECT_EQ(0, zmq_connect(mock_cap, get_config(CAPTURE_END_KEY).c_str()));
|
||||||
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0));
|
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0));
|
||||||
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)));
|
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)));
|
||||||
|
|
||||||
|
if(!proxy_finished_init) {
|
||||||
|
zmq_msg_t msg;
|
||||||
|
zmq_msg_init(&msg);
|
||||||
|
EXPECT_EQ(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message
|
||||||
|
proxy_finished_init = true;
|
||||||
|
}
|
||||||
|
|
||||||
while(!term) {
|
while(!term) {
|
||||||
string source;
|
string source;
|
||||||
internal_event_t ev_int;
|
internal_event_t ev_int;
|
||||||
|
Loading…
Reference in New Issue
Block a user