Fix contention for port streamStats across threads

Tx/Rx stream stats related threads no longer have a direct reference to the
port's stream stats - instead they have their own copy that they keep and
return (in a reset-on-read fashion) when asked for. Each copy also has it's
own lock to prevent contention for read/update/clear.

PcapPort now fetches Tx(Transmitter) and Rx(Poller) stats on demand and
updates the port's stream stats - under protection of a lock.
This commit is contained in:
Srivats P 2023-06-17 11:28:38 +05:30
parent bef0f1d162
commit 5ec7010c51
9 changed files with 97 additions and 30 deletions

View File

@ -887,6 +887,10 @@ void AbstractPort::streamStats(uint guid, OstProto::StreamStatsList *stats)
// In case stats are being maintained elsewhere
updateStreamStats();
// Lock for read here as updateStreamStats() above will take write lock
// and the lock is NOT recursive
QReadLocker lock(&streamStatsLock_);
if (streamStats_.contains(guid))
{
StreamStatsTuple sst = streamStats_.value(guid);
@ -910,6 +914,10 @@ void AbstractPort::streamStatsAll(OstProto::StreamStatsList *stats)
// In case stats are being maintained elsewhere
updateStreamStats();
// Lock for read here as updateStreamStats() above will take write lock
// and the lock is NOT recursive
QReadLocker lock(&streamStatsLock_);
// FIXME: change input param to a non-OstProto type and/or have
// a getFirst/Next like API?
double txDur = lastTransmitDuration();
@ -935,12 +943,14 @@ void AbstractPort::streamStatsAll(OstProto::StreamStatsList *stats)
void AbstractPort::resetStreamStats(uint guid)
{
QWriteLocker lock(&streamStatsLock_);
streamStats_.remove(guid);
clearStreamTiming(guid);
}
void AbstractPort::resetStreamStatsAll()
{
QWriteLocker lock(&streamStatsLock_);
streamStats_.clear();
clearStreamTiming();
}

View File

@ -24,6 +24,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "streamstats.h"
#include <QList>
#include <QReadWriteLock>
#include <QtGlobal>
#include <limits.h>
@ -163,6 +164,7 @@ protected:
quint64 maxStatsValue_;
struct PortStats stats_;
StreamStats streamStats_;
QReadWriteLock streamStatsLock_;
//! \todo Need lock for stats access/update
const uint kTtagTimeInterval_{5}; // in seconds

View File

@ -31,11 +31,11 @@ PcapPort::PcapPort(int id, const char *device)
{
monitorRx_ = new PortMonitor(device, kDirectionRx, &stats_);
monitorTx_ = new PortMonitor(device, kDirectionTx, &stats_);
transmitter_ = new PcapTransmitter(device, streamStats_);
transmitter_ = new PcapTransmitter(device);
capturer_ = new PortCapturer(device);
emulXcvr_ = new EmulationTransceiver(device, deviceManager_);
txTtagStatsPoller_ = new PcapTxTtagStats(device, id);
rxStatsPoller_ = new PcapRxStats(device, streamStats_, id);
rxStatsPoller_ = new PcapRxStats(device, id);
if (!monitorRx_->handle() || !monitorTx_->handle())
isUsable_ = false;
@ -148,9 +148,14 @@ bool PcapPort::setRateAccuracy(AbstractPort::Accuracy accuracy)
void PcapPort::updateStreamStats()
{
// XXX: PcapTxThread already does this at the end of transmit; we
// just dump tx/rx stats poller debug stats here
QWriteLocker lock(&streamStatsLock_);
// XXX: Transmitter may also 'adjust' rx stats in some cases (pcap
// direction not supported platforms)
transmitter_->updateTxRxStreamStats(streamStats_);
rxStatsPoller_->updateRxStreamStats(streamStats_);
// Dump tx/rx stats poller debug stats
qDebug("port %d txTtagStatsPoller: %s",
id(), qUtf8Printable(txTtagStatsPoller_->debugStats()));
qDebug("port %d rxStatsPoller: %s",

View File

@ -26,8 +26,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#define Xnotify qWarning // FIXME
PcapRxStats::PcapRxStats(const char *device, StreamStats &portStreamStats, int id)
: streamStats_(portStreamStats)
PcapRxStats::PcapRxStats(const char *device, int id)
{
setObjectName(QString("Rx$:%1").arg(device));
device_ = QString::fromLatin1(device);
@ -215,3 +214,21 @@ bool PcapRxStats::isDirectional()
{
return isDirectional_;
}
// XXX: Stats are reset on read
void PcapRxStats::updateRxStreamStats(StreamStats &streamStats)
{
QMutexLocker lock(&streamStatsLock_);
StreamStatsIterator i(streamStats_);
while (i.hasNext())
{
i.next();
uint guid = i.key();
StreamStatsTuple sst = i.value();
streamStats[guid].rx_pkts += sst.rx_pkts;
streamStats[guid].rx_bytes += sst.rx_bytes;
}
streamStats_.clear();
}

View File

@ -24,12 +24,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcapsession.h"
#include <QMutex>
class StreamTiming;
class PcapRxStats: public PcapSession
{
public:
PcapRxStats(const char *device, StreamStats &portStreamStats, int id);
PcapRxStats(const char *device, int id);
pcap_t* handle();
void run();
bool start();
@ -37,6 +39,7 @@ public:
bool isRunning();
bool isDirectional();
void updateRxStreamStats(StreamStats &streamStats); // Reset on read
private:
enum State {
kNotStarted,
@ -45,7 +48,8 @@ private:
};
QString device_;
StreamStats &streamStats_;
StreamStats streamStats_;
QMutex streamStatsLock_;
volatile bool stop_;
volatile State state_;
bool isDirectional_;

View File

@ -20,9 +20,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcaptransmitter.h"
PcapTransmitter::PcapTransmitter(
const char *device,
StreamStats &portStreamStats)
: streamStats_(portStreamStats), txThread_(device)
const char *device)
: txThread_(device)
{
adjustRxStreamStats_ = false;
txStats_.setObjectName(QString("TxStats:%1").arg(device));
@ -55,6 +54,31 @@ bool PcapTransmitter::setStreamStatsTracking(bool enable)
return txThread_.setStreamStatsTracking(enable);
}
// XXX: Stats are reset on read
void PcapTransmitter::updateTxRxStreamStats(StreamStats &streamStats)
{
QMutexLocker lock(&streamStatsLock_);
StreamStatsIterator i(streamStats_);
while (i.hasNext())
{
i.next();
uint guid = i.key();
StreamStatsTuple sst = i.value();
streamStats[guid].tx_pkts += sst.tx_pkts;
streamStats[guid].tx_bytes += sst.tx_bytes;
if (adjustRxStreamStats_) {
// XXX: rx_pkts counting may lag behind tx_pkts, so stream stats
// may become negative after adjustment transiently. But this
// should fix itself once all the rx pkts come in
streamStats[guid].rx_pkts -= sst.tx_pkts;
streamStats[guid].rx_bytes -= sst.tx_bytes;
}
}
streamStats_.clear();
}
void PcapTransmitter::clearPacketList()
{
txThread_.clearPacketList();
@ -114,6 +138,7 @@ bool PcapTransmitter::isRunning()
{
return txThread_.isRunning();
}
double PcapTransmitter::lastTxDuration()
{
return txThread_.lastTxDuration();
@ -121,8 +146,9 @@ double PcapTransmitter::lastTxDuration()
void PcapTransmitter::updateTxThreadStreamStats()
{
QMutexLocker lock(&streamStatsLock_);
PcapTxThread *txThread = dynamic_cast<PcapTxThread*>(sender());
const StreamStats& threadStreamStats = txThread->streamStats();
StreamStats threadStreamStats = txThread->streamStats();
StreamStatsIterator i(threadStreamStats);
while (i.hasNext())
@ -133,13 +159,5 @@ void PcapTransmitter::updateTxThreadStreamStats()
streamStats_[guid].tx_pkts += sst.tx_pkts;
streamStats_[guid].tx_bytes += sst.tx_bytes;
if (adjustRxStreamStats_) {
// XXX: rx_pkts counting may lag behind tx_pkts, so stream stats
// may become negative after adjustment transiently. But this
// should fix itself once all the rx pkts come in
streamStats_[guid].rx_pkts -= sst.tx_pkts;
streamStats_[guid].rx_bytes -= sst.tx_bytes;
}
}
txThread->clearStreamStats();
}

View File

@ -29,12 +29,13 @@ class PcapTransmitter : QObject
{
Q_OBJECT
public:
PcapTransmitter(const char *device, StreamStats &portStreamStats);
PcapTransmitter(const char *device);
~PcapTransmitter();
bool setRateAccuracy(AbstractPort::Accuracy accuracy);
bool setStreamStatsTracking(bool enable);
void adjustRxStreamStats(bool enable);
void updateTxRxStreamStats(StreamStats &streamStats); // Reset on read
void clearPacketList();
void loopNextPacketSet(qint64 size, qint64 repeats,
@ -54,7 +55,8 @@ public:
private slots:
void updateTxThreadStreamStats();
private:
StreamStats &streamStats_;
StreamStats streamStats_;
QMutex streamStatsLock_;
PcapTxThread txThread_;
PcapTxStats txStats_;
StatsTuple stats_;

View File

@ -236,14 +236,20 @@ void PcapTxThread::setStats(StatsTuple *stats)
stats_ = stats;
}
const StreamStats& PcapTxThread::streamStats()
StreamStats PcapTxThread::streamStats()
{
return streamStats_;
}
// This function is typically called in client-specific-RPC-thread
// context; hence different client RPC threads may call this function,
// so use a lock. Although RPCs are protected by the portLock just
// for this purpose, the streamStats RPC takes a Read lock, so it can
// still happen that multiple RPC threads land up here - that's why
// this lock is required
QMutexLocker lock(&streamStatsLock_);
void PcapTxThread::clearStreamStats()
{
streamStats_.clear();
StreamStats ss(streamStats_); // Make a copy
streamStats_.clear(); // Reset on read semantics
return ss; // Return copy
}
void PcapTxThread::run()
@ -531,6 +537,8 @@ int PcapTxThread::sendQueueTransmit(pcap_t *p, PacketSequence *seq,
void PcapTxThread::updateTxStreamStats()
{
QMutexLocker lock(&streamStatsLock_);
// If no packets in list, nothing to be done
if (!packetListSize_)
return;

View File

@ -24,6 +24,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "packetsequence.h"
#include "statstuple.h"
#include <QMutex>
#include <QThread>
#include <pcap.h>
@ -48,8 +49,7 @@ public:
void setStats(StatsTuple *stats);
const StreamStats& streamStats();
void clearStreamStats();
StreamStats streamStats(); // reset on read
void run();
@ -94,6 +94,7 @@ private:
StatsTuple *stats_;
StatsTuple lastStats_;
StreamStats streamStats_;
QMutex streamStatsLock_;
quint8 ttagId_{0};
double lastTxDuration_{0.0}; // in secs