Fix spurious stream stats drops

The problem happens for bidirectional flows. The sequence of events is
as follows when you start Tx on Ports p1, p2 with the current code -

1. Clear stream stats on p1
2. Start tx on p1
3. Clear stream stats on p2
4. Start tx on p2

By the time #3 is executed, it may have already rx packets from p1 which
are being incorrectly cleared, this will cause these number of packets
to show up as dropped instead - incorrectly.

The fix is to change the order like this -

1. Clear stream stats on p1
2. Clear stream stats on p2
3. Start tx on p1
4. Start tx on p2

Unidirectional flows will not see this problem - as long as startTx is
done only on the Tx port and not the Rx port.

This bug is a regression caused due to the code changes introduced for the
stream stats rates feature implemented in 1.2.0
This commit is contained in:
Srivats P 2023-02-08 16:18:02 +05:30
parent e2369c02bc
commit c70811eaa4
8 changed files with 100 additions and 13 deletions

View File

@ -429,13 +429,12 @@ void MyService::startTransmit(::google::protobuf::RpcController* /*controller*/,
// XXX: stream stats uses port tx duration to calculate per stream
// rates; tx duration is for the last tx run only - so stream stats
// should also correspond to the last run only.
// Hence clear stream stats before Tx
// Hence clear stream stats before Tx.
// XXX: clear stream stats on ALL ports in user provided list before
// starting Tx on any of them
for (int i = 0; i < request->port_id_size(); i++)
{
int portId;
int frameError = 0;
portId = request->port_id(i).id();
int portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size())) {
error = true;
notes += QString("Port %1 start transmit: invalid port\n")
@ -443,10 +442,23 @@ void MyService::startTransmit(::google::protobuf::RpcController* /*controller*/,
continue;
}
portLock[portId]->lockForWrite();
portInfo[portId]->resetStreamStatsAll();
portLock[portId]->unlock();
}
for (int i = 0; i < request->port_id_size(); i++)
{
int frameError = 0;
int portId = request->port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size())) {
continue;
}
portLock[portId]->lockForWrite();
if (portInfo[portId]->isDirty())
frameError = portInfo[portId]->updatePacketList();
portInfo[portId]->resetStreamStatsAll();
portInfo[portId]->startTransmit();
portLock[portId]->unlock();
if (frameError) {

View File

@ -34,7 +34,7 @@ PcapPort::PcapPort(int id, const char *device)
transmitter_ = new PcapTransmitter(device, streamStats_);
capturer_ = new PortCapturer(device);
emulXcvr_ = new EmulationTransceiver(device, deviceManager_);
rxStatsPoller_ = new PcapRxStats(device, streamStats_);
rxStatsPoller_ = new PcapRxStats(device, streamStats_, id);
if (!monitorRx_->handle() || !monitorTx_->handle())
isUsable_ = false;
@ -142,6 +142,15 @@ bool PcapPort::setRateAccuracy(AbstractPort::Accuracy accuracy)
return false;
}
void PcapPort::updateStreamStats()
{
// XXX: PcapTxThread already does this at the end of transmit; we
// just dump rx stats poller debug stats here
qDebug("port %d rxStatsPoller: %s",
id(), qUtf8Printable(rxStatsPoller_->debugStats()));
}
void PcapPort::startDeviceEmulation()
{
emulXcvr_->start();

View File

@ -77,6 +77,8 @@ public:
virtual bool isCaptureOn() { return capturer_->isRunning(); }
virtual QIODevice* captureData() { return capturer_->captureFile(); }
virtual void updateStreamStats();
virtual void startDeviceEmulation();
virtual void stopDeviceEmulation();
virtual int sendEmulationPacket(PacketBuffer *pktBuf);

View File

@ -24,7 +24,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#define Xnotify qWarning // FIXME
PcapRxStats::PcapRxStats(const char *device, StreamStats &portStreamStats)
PcapRxStats::PcapRxStats(const char *device, StreamStats &portStreamStats, int id)
: streamStats_(portStreamStats)
{
device_ = QString::fromLatin1(device);
@ -33,6 +33,8 @@ PcapRxStats::PcapRxStats(const char *device, StreamStats &portStreamStats)
isDirectional_ = true;
handle_ = NULL;
id_ = id;
}
pcap_t* PcapRxStats::handle()
@ -104,6 +106,7 @@ void PcapRxStats::run()
}
_skip_filter:
memset(&lastPcapStats_, 0, sizeof(lastPcapStats_));
PcapSession::preRun();
state_ = kRunning;
while (1) {
@ -143,7 +146,6 @@ _skip_filter:
}
}
PcapSession::postRun();
pcap_close(handle_);
handle_ = NULL;
stop_ = false;
@ -191,3 +193,57 @@ bool PcapRxStats::isDirectional()
{
return isDirectional_;
}
// XXX: Implemented as reset on read
QString PcapRxStats::debugStats()
{
QString dbgStats;
#ifdef Q_OS_WIN32
static_assert(sizeof(struct pcap_stat) == 6*sizeof(uint),
"pcap_stat has less or more than 6 values");
int size;
struct pcap_stat incPcapStats;
struct pcap_stat *pcapStats = pcap_stats_ex(handle_, &size);
if (pcapStats && (uint(size) >= 6*sizeof(uint))) {
incPcapStats.ps_recv = pcapStats->ps_recv - lastPcapStats_.ps_recv;
incPcapStats.ps_drop = pcapStats->ps_drop - lastPcapStats_.ps_drop;
incPcapStats.ps_ifdrop = pcapStats->ps_ifdrop - lastPcapStats_.ps_ifdrop;
incPcapStats.ps_capt = pcapStats->ps_capt - lastPcapStats_.ps_capt;
incPcapStats.ps_sent = pcapStats->ps_sent - lastPcapStats_.ps_sent;
incPcapStats.ps_netdrop = pcapStats->ps_netdrop - lastPcapStats_.ps_netdrop;
dbgStats = QString("recv: %1 drop: %2 ifdrop: %3 "
"capt: %4 sent: %5 netdrop: %6")
.arg(incPcapStats.ps_recv)
.arg(incPcapStats.ps_drop)
.arg(incPcapStats.ps_ifdrop)
.arg(incPcapStats.ps_capt)
.arg(incPcapStats.ps_sent)
.arg(incPcapStats.ps_netdrop);
lastPcapStats_ = *pcapStats;
} else {
dbgStats = QString("error reading pcap stats: %1")
.arg(pcap_geterr(handle_));
}
#else
struct pcap_stat pcapStats;
struct pcap_stat incPcapStats;
int ret = pcap_stats(handle_, &pcapStats);
if (ret == 0) {
incPcapStats.ps_recv = pcapStats.ps_recv - lastPcapStats_.ps_recv;
incPcapStats.ps_drop = pcapStats.ps_drop - lastPcapStats_.ps_drop;
incPcapStats.ps_ifdrop = pcapStats.ps_ifdrop - lastPcapStats_.ps_ifdrop;
dbgStats = QString("recv: %1 drop: %2 ifdrop: %3")
.arg(incPcapStats.ps_recv)
.arg(incPcapStats.ps_drop)
.arg(incPcapStats.ps_ifdrop);
lastPcapStats_ = *pcapStats;
} else {
dbgStats = QString("error reading pcap stats: %1")
.arg(pcap_geterr(handle_));
}
#endif
return dbgStats;
}

View File

@ -27,7 +27,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
class PcapRxStats: public PcapSession
{
public:
PcapRxStats(const char *device, StreamStats &portStreamStats);
PcapRxStats(const char *device, StreamStats &portStreamStats, int id);
pcap_t* handle();
void run();
bool start();
@ -35,6 +35,8 @@ public:
bool isRunning();
bool isDirectional();
QString debugStats();
private:
enum State {
kNotStarted,
@ -48,6 +50,9 @@ private:
pcap_t *handle_;
volatile State state_;
bool isDirectional_;
int id_;
struct pcap_stat lastPcapStats_;
};
#endif

View File

@ -127,6 +127,9 @@ void PcapTransmitter::updateTxThreadStreamStats()
streamStats_[guid].tx_pkts += sst.tx_pkts;
streamStats_[guid].tx_bytes += sst.tx_bytes;
if (adjustRxStreamStats_) {
// XXX: rx_pkts counting may lag behind tx_pkts, so stream stats
// may become negative after adjustment transiently. But this
// should fix itself once all the rx pkts come in
streamStats_[guid].rx_pkts -= sst.tx_pkts;
streamStats_[guid].rx_bytes -= sst.tx_bytes;
}

View File

@ -365,7 +365,7 @@ _exit2:
//Q_ASSERT(lastTxDuration_ >= 0);
if (trackStreamStats_)
updateStreamStats();
updateTxStreamStats();
state_ = kFinished;
}
@ -466,7 +466,7 @@ int PcapTxThread::sendQueueTransmit(pcap_t *p,
return 0;
}
void PcapTxThread::updateStreamStats()
void PcapTxThread::updateTxStreamStats()
{
// If no packets in list, nothing to be done
if (!packetListSize_)

View File

@ -68,7 +68,7 @@ private:
static void udelay(unsigned long usec);
int sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, long &overHead,
int sync);
void updateStreamStats();
void updateTxStreamStats();
// Intermediate state variables used while building the packet list
PacketSequence *currentPacketSequence_;