From 233a772f498343565757e2c37af72c7935af98c0 Mon Sep 17 00:00:00 2001 From: Zain Budhwani <99770260+zbud-msft@users.noreply.github.com> Date: Tue, 26 Sep 2023 16:59:43 -0700 Subject: [PATCH] Add fix for deserializing XSUB/XPUB subscription message (#16598) ### Why I did it ##### Work item tracking - Microsoft ADO **(number only)**:24851367 #### How I did it Read subscription message when capture service starts, before reading cached events. #### How to verify it UT/Manual testing --- src/sonic-eventd/src/eventd.cpp | 21 +++++++++++++++++++++ src/sonic-eventd/tests/eventd_ut.cpp | 8 ++++++++ 2 files changed, 29 insertions(+) diff --git a/src/sonic-eventd/src/eventd.cpp b/src/sonic-eventd/src/eventd.cpp index 31da3d6849..953fe9b7c4 100644 --- a/src/sonic-eventd/src/eventd.cpp +++ b/src/sonic-eventd/src/eventd.cpp @@ -1,6 +1,7 @@ #include #include "eventd.h" #include "dbconnector.h" +#include "zmq.h" /* * There are 5 threads, including the main @@ -355,6 +356,7 @@ capture_service::do_capture() int init_cnt; void *cap_sub_sock = NULL; counters_t total_overflow = 0; + static bool init_done = false; typedef enum { /* @@ -391,6 +393,25 @@ capture_service::do_capture() m_cap_run = true; + if(!init_done) { + zmq_msg_t msg; + zmq_msg_init(&msg); + int rc = zmq_msg_recv(&msg, cap_sub_sock, 0); + RET_ON_ERR(rc == 1, "Failed to read subscription message when XSUB connects to XPUB"); + /* + * When XSUB socket connects to XPUB, a subscription message is sent as a single byte 1. + * When capture service begins to read, the very first message that it will read is this + * control character. + * + * We will handle by reading this message and dropping it before we begin reading for + * cached events. + * + * This behavior will only happen once when XSUB connects to XPUB not everytime cache is started. + * + */ + init_done = true; + } + while (m_ctrl != START_CAPTURE) { /* Wait for capture start */ this_thread::sleep_for(chrono::milliseconds(10)); diff --git a/src/sonic-eventd/tests/eventd_ut.cpp b/src/sonic-eventd/tests/eventd_ut.cpp index e793ddf588..db46845b04 100644 --- a/src/sonic-eventd/tests/eventd_ut.cpp +++ b/src/sonic-eventd/tests/eventd_ut.cpp @@ -159,12 +159,20 @@ void run_cap(void *zctx, bool &term, string &read_source, internal_event_t ev_int; int block_ms = 200; int i=0; + static int proxy_finished_init = false; EXPECT_TRUE(NULL != mock_cap); EXPECT_EQ(0, zmq_connect(mock_cap, get_config(CAPTURE_END_KEY).c_str())); EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0)); EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms))); + if(!proxy_finished_init) { + zmq_msg_t msg; + zmq_msg_init(&msg); + EXPECT_EQ(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message + proxy_finished_init = true; + } + while(!term) { string source; internal_event_t ev_int;