Integrate StreamTiming with the code

Bugs found during integration were fixed and minor code improvements were made
such as using consts, const params, renaming members etc.
This commit is contained in:
Srivats P 2023-03-31 16:55:03 +05:30
parent f4c21e1ae4
commit 05335b31d5
12 changed files with 159 additions and 35 deletions

View File

@ -292,6 +292,7 @@ message StreamStats {
required StreamGuid stream_guid = 2; required StreamGuid stream_guid = 2;
optional double tx_duration = 3; // in seconds optional double tx_duration = 3; // in seconds
optional uint64 delay = 4; // in nanoseconds
optional uint64 rx_pkts = 11; optional uint64 rx_pkts = 11;
optional uint64 rx_bytes = 12; optional uint64 rx_bytes = 12;

View File

@ -250,7 +250,7 @@ bool SignProtocol::packetTtagId(const uchar *pkt, int pktLen, uint *ttagId, uint
if (magic != kSignMagic) if (magic != kSignMagic)
return ret; return ret;
*guid = 0xffffffff; // invalid GUID *guid = kInvalidGuid;
p--; p--;
while (*p != kTypeLenEnd) { while (*p != kTypeLenEnd) {
if (*p == kTypeLenTtag) { if (*p == kTypeLenTtag) {

View File

@ -23,6 +23,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "abstractprotocol.h" #include "abstractprotocol.h"
#include "sign.pb.h" #include "sign.pb.h"
#include <limits.h>
/* /*
Sign Protocol is expected at the end of the frame (just before the Eth FCS) Sign Protocol is expected at the end of the frame (just before the Eth FCS)
---+--------+-------+ ---+--------+-------+
@ -89,6 +91,7 @@ public:
static bool packetTtagId(const uchar *pkt, int pktLen, uint *ttagId, uint *guid); static bool packetTtagId(const uchar *pkt, int pktLen, uint *ttagId, uint *guid);
static const quint32 kMaxGuid = 0x00ffffff; static const quint32 kMaxGuid = 0x00ffffff;
static const quint32 kInvalidGuid = UINT_MAX;
static const quint8 kTypeLenTtagPlaceholder = 0x22; static const quint8 kTypeLenTtagPlaceholder = 0x22;
static const quint8 kTypeLenTtag = 0x23; static const quint8 kTypeLenTtag = 0x23;
private: private:

View File

@ -25,6 +25,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "devicemanager.h" #include "devicemanager.h"
#include "interfaceinfo.h" #include "interfaceinfo.h"
#include "packetbuffer.h" #include "packetbuffer.h"
#include "streamtiming.h"
#include <QString> #include <QString>
#include <QIODevice> #include <QIODevice>
@ -54,6 +55,8 @@ AbstractPort::AbstractPort(int id, const char *device)
maxStatsValue_ = ULLONG_MAX; // assume 64-bit stats maxStatsValue_ = ULLONG_MAX; // assume 64-bit stats
memset((void*) &stats_, 0, sizeof(stats_)); memset((void*) &stats_, 0, sizeof(stats_));
resetStats(); resetStats();
streamTiming_ = StreamTiming::instance();
} }
AbstractPort::~AbstractPort() AbstractPort::~AbstractPort()
@ -734,6 +737,16 @@ void AbstractPort::stats(PortStats *stats)
stats_.rxFrameErrors + (maxStatsValue_ - epochStats_.rxFrameErrors); stats_.rxFrameErrors + (maxStatsValue_ - epochStats_.rxFrameErrors);
} }
quint64 AbstractPort::streamTimingDelay(uint guid)
{
return streamTiming_->delay(id(), guid);
}
void AbstractPort::clearStreamTiming(uint guid)
{
streamTiming_->clear(id(), guid);
}
void AbstractPort::streamStats(uint guid, OstProto::StreamStatsList *stats) void AbstractPort::streamStats(uint guid, OstProto::StreamStatsList *stats)
{ {
// In case stats are being maintained elsewhere // In case stats are being maintained elsewhere
@ -748,6 +761,7 @@ void AbstractPort::streamStats(uint guid, OstProto::StreamStatsList *stats)
s->mutable_port_id()->set_id(id()); s->mutable_port_id()->set_id(id());
s->set_tx_duration(lastTransmitDuration()); s->set_tx_duration(lastTransmitDuration());
s->set_delay(streamTimingDelay(guid));
s->set_tx_pkts(sst.tx_pkts); s->set_tx_pkts(sst.tx_pkts);
s->set_tx_bytes(sst.tx_bytes); s->set_tx_bytes(sst.tx_bytes);
@ -775,6 +789,7 @@ void AbstractPort::streamStatsAll(OstProto::StreamStatsList *stats)
s->mutable_port_id()->set_id(id()); s->mutable_port_id()->set_id(id());
s->set_tx_duration(txDur); s->set_tx_duration(txDur);
s->set_delay(streamTimingDelay(i.key()));
s->set_tx_pkts(sst.tx_pkts); s->set_tx_pkts(sst.tx_pkts);
s->set_tx_bytes(sst.tx_bytes); s->set_tx_bytes(sst.tx_bytes);
@ -786,11 +801,13 @@ void AbstractPort::streamStatsAll(OstProto::StreamStatsList *stats)
void AbstractPort::resetStreamStats(uint guid) void AbstractPort::resetStreamStats(uint guid)
{ {
streamStats_.remove(guid); streamStats_.remove(guid);
clearStreamTiming(guid);
} }
void AbstractPort::resetStreamStatsAll() void AbstractPort::resetStreamStatsAll()
{ {
streamStats_.clear(); streamStats_.clear();
clearStreamTiming();
} }
void AbstractPort::clearDeviceNeighbors() void AbstractPort::clearDeviceNeighbors()

View File

@ -20,18 +20,20 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#ifndef _SERVER_ABSTRACT_PORT_H #ifndef _SERVER_ABSTRACT_PORT_H
#define _SERVER_ABSTRACT_PORT_H #define _SERVER_ABSTRACT_PORT_H
#include "../common/protocol.pb.h"
#include "streamstats.h" #include "streamstats.h"
#include <QList> #include <QList>
#include <QtGlobal> #include <QtGlobal>
#include "../common/protocol.pb.h" #include <limits.h>
class DeviceManager; class DeviceManager;
struct InterfaceInfo; struct InterfaceInfo;
class StreamBase;
class PacketBuffer; class PacketBuffer;
class QIODevice; class QIODevice;
class StreamBase;
class StreamTiming;
// TODO: send notification back to client(s) // TODO: send notification back to client(s)
#define Xnotify qWarning #define Xnotify qWarning
@ -120,6 +122,9 @@ public:
void stats(PortStats *stats); void stats(PortStats *stats);
void resetStats() { epochStats_ = stats_; } void resetStats() { epochStats_ = stats_; }
quint64 streamTimingDelay(uint guid);
void clearStreamTiming(uint guid = UINT_MAX);
// FIXME: combine single and All calls? // FIXME: combine single and All calls?
void streamStats(uint guid, OstProto::StreamStatsList *stats); void streamStats(uint guid, OstProto::StreamStatsList *stats);
void streamStatsAll(OstProto::StreamStatsList *stats); void streamStatsAll(OstProto::StreamStatsList *stats);
@ -177,6 +182,8 @@ private:
QList<StreamBase*> streamList_; QList<StreamBase*> streamList_;
struct PortStats epochStats_; struct PortStats epochStats_;
StreamTiming *streamTiming_{nullptr};
}; };
#endif #endif

View File

@ -21,6 +21,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcapextra.h" #include "pcapextra.h"
#include "../common/sign.h" #include "../common/sign.h"
#include "streamtiming.h"
#define Xnotify qWarning // FIXME #define Xnotify qWarning // FIXME
@ -35,6 +36,8 @@ PcapRxStats::PcapRxStats(const char *device, StreamStats &portStreamStats, int i
handle_ = NULL; handle_ = NULL;
id_ = id; id_ = id;
timing_ = StreamTiming::instance();
} }
pcap_t* PcapRxStats::handle() pcap_t* PcapRxStats::handle()
@ -117,8 +120,13 @@ _skip_filter:
ret = pcap_next_ex(handle_, &hdr, &data); ret = pcap_next_ex(handle_, &hdr, &data);
switch (ret) { switch (ret) {
case 1: { case 1: {
uint guid; uint ttagId, guid;
if (SignProtocol::packetGuid(data, hdr->caplen, &guid)) { if (SignProtocol::packetTtagId(data, hdr->caplen, &ttagId, &guid)) {
timing_->recordRxTime(id_, guid, ttagId, hdr->ts);
qDebug("XXXXX [%d RX] %ld:%ld ttag %u guid %u", id_,
hdr->ts.tv_sec, long(hdr->ts.tv_usec), ttagId, guid);
}
if (guid != SignProtocol::kInvalidGuid) {
streamStats_[guid].rx_pkts++; streamStats_[guid].rx_pkts++;
streamStats_[guid].rx_bytes += hdr->caplen; streamStats_[guid].rx_bytes += hdr->caplen;
} }

View File

@ -24,6 +24,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcapsession.h" #include "pcapsession.h"
class StreamTiming;
class PcapRxStats: public PcapSession class PcapRxStats: public PcapSession
{ {
public: public:
@ -48,7 +50,9 @@ private:
volatile State state_; volatile State state_;
bool isDirectional_; bool isDirectional_;
int id_; int id_; // FIXME: rename to portId_
StreamTiming *timing_{nullptr};
}; };
#endif #endif

View File

@ -24,6 +24,9 @@ QString PcapSession::debugStats()
{ {
QString dbgStats; QString dbgStats;
if (!handle_)
return QString();
#ifdef Q_OS_WIN32 #ifdef Q_OS_WIN32
static_assert(sizeof(struct pcap_stat) == 6*sizeof(uint), static_assert(sizeof(struct pcap_stat) == 6*sizeof(uint),
"pcap_stat has less or more than 6 values"); "pcap_stat has less or more than 6 values");

View File

@ -21,14 +21,17 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcapextra.h" #include "pcapextra.h"
#include "../common/sign.h" #include "../common/sign.h"
#include "streamtiming.h"
#define Xnotify qWarning // FIXME #define Xnotify qWarning // FIXME
PcapTxTtagStats::PcapTxTtagStats(const char *device, int id) PcapTxTtagStats::PcapTxTtagStats(const char *device, int id)
: id_(id) : portId_(id)
{ {
setObjectName(QString("TxTtagStats:%1").arg(device)); setObjectName(QString("TxTtagStats:%1").arg(device));
device_ = QString::fromLatin1(device); device_ = QString::fromLatin1(device);
timing_ = StreamTiming::instance();
} }
void PcapTxTtagStats::run() void PcapTxTtagStats::run()
@ -37,9 +40,10 @@ void PcapTxTtagStats::run()
char errbuf[PCAP_ERRBUF_SIZE] = ""; char errbuf[PCAP_ERRBUF_SIZE] = "";
struct bpf_program bpf; struct bpf_program bpf;
const int optimize = 1; const int optimize = 1;
QString capture_filter = QString("(ether[len - 5:5] == 0x%1%2)") QString capture_filter = QString(
.arg(SignProtocol::kTypeLenTtag, 0, BASE_HEX) "(ether[len-4:4] == 0x%1) and (ether[len-5:1] == 0x%2")
.arg(SignProtocol::magic(), 0, BASE_HEX); .arg(SignProtocol::magic(), 0, BASE_HEX)
.arg(SignProtocol::kTypeLenTtag, 0, BASE_HEX);
qDebug("In %s", __PRETTY_FUNCTION__); qDebug("In %s", __PRETTY_FUNCTION__);
qDebug("pcap-filter: %s", qPrintable(capture_filter)); qDebug("pcap-filter: %s", qPrintable(capture_filter));
@ -102,8 +106,8 @@ _skip_filter:
uint guid; uint guid;
if (SignProtocol::packetTtagId(data, hdr->caplen, if (SignProtocol::packetTtagId(data, hdr->caplen,
&ttagId, &guid)) { &ttagId, &guid)) {
// TODO: store packet timestamp with ttag/guid timing_->recordTxTime(portId_, guid, ttagId, hdr->ts);
qDebug("XXXXX [%d] %ld:%ld ttag %u guid %u", id_, qDebug("XXXXX [%d TX] %ld:%ld ttag %u guid %u", portId_,
hdr->ts.tv_sec, long(hdr->ts.tv_usec), ttagId, guid); hdr->ts.tv_sec, long(hdr->ts.tv_usec), ttagId, guid);
} }
break; break;
@ -179,12 +183,15 @@ bool PcapTxTtagStats::isDirectional()
return isDirectional_; return isDirectional_;
} }
// FIXME: move to PcapSession // FIXME: moved to PcapSession, remove from here
// XXX: Implemented as reset on read // XXX: Implemented as reset on read
QString PcapTxTtagStats::debugStats() QString PcapTxTtagStats::debugStats()
{ {
QString dbgStats; QString dbgStats;
if (!handle_)
return QString();
#ifdef Q_OS_WIN32 #ifdef Q_OS_WIN32
static_assert(sizeof(struct pcap_stat) == 6*sizeof(uint), static_assert(sizeof(struct pcap_stat) == 6*sizeof(uint),
"pcap_stat has less or more than 6 values"); "pcap_stat has less or more than 6 values");

View File

@ -22,6 +22,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcapsession.h" #include "pcapsession.h"
class StreamTiming;
class PcapTxTtagStats: public PcapSession class PcapTxTtagStats: public PcapSession
{ {
public: public:
@ -48,8 +50,10 @@ private:
volatile State state_{kNotStarted}; volatile State state_{kNotStarted};
volatile bool stop_{false}; volatile bool stop_{false};
int id_; // FIXME: needed? int portId_;
struct pcap_stat lastPcapStats_; struct pcap_stat lastPcapStats_;
StreamTiming *timing_{nullptr};
}; };
#endif #endif

View File

@ -21,48 +21,51 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "timestamp.h" #include "timestamp.h"
#include <QCoreApplication>
StreamTiming::StreamTiming() StreamTiming::StreamTiming()
: QObject(nullptr) // FIXME: parent : QObject(nullptr) // FIXME: parent
{ {
// This class must be part of the main thread so that timers can work
Q_ASSERT(this->thread() == QCoreApplication::instance()->thread());
timer_ = new QTimer(this); timer_ = new QTimer(this);
connect(timer_, &QTimer::timeout, this, &StreamTiming::processRecords); connect(timer_, &QTimer::timeout, this, &StreamTiming::processRecords);
timer_->setInterval(3000); timer_->setInterval(3000);
timer_->start();
gcTimer_ = new QTimer(this); gcTimer_ = new QTimer(this);
connect(gcTimer_, &QTimer::timeout, this, &StreamTiming::deleteStaleRecords); connect(gcTimer_, &QTimer::timeout, this, &StreamTiming::deleteStaleRecords);
gcTimer_->setInterval(30000); gcTimer_->setInterval(30000);
//FIXME:gcTimer_->start();
} }
bool StreamTiming::recordTxTime(uint portId, uint guid, uint ttagId, bool StreamTiming::recordTxTime(uint portId, uint guid, uint ttagId,
struct timespec timestamp) const struct timespec &timestamp)
{ {
Key key = makeKey(guid, ttagId); TxRxKey key = makeKey(guid, ttagId);
TtagData value = { .timeStamp = timestamp, .portId = portId}; TtagData value = { .timeStamp = timestamp, .portId = portId};
QMutexLocker locker(&txHashLock_); QMutexLocker locker(&txHashLock_);
txHash_.insert(key, value); txHash_.insert(key, value);
if (!timer_->isActive())
timer_->start();
return true; return true;
} }
bool StreamTiming::recordRxTime(uint portId, uint guid, uint ttagId, bool StreamTiming::recordRxTime(uint portId, uint guid, uint ttagId,
struct timespec timestamp) const struct timespec &timestamp)
{ {
Key key = makeKey(guid, ttagId); TxRxKey key = makeKey(guid, ttagId);
TtagData value = { .timeStamp = timestamp, .portId = portId}; TtagData value = { .timeStamp = timestamp, .portId = portId};
QMutexLocker locker(&rxHashLock_); QMutexLocker locker(&rxHashLock_);
rxHash_.insert(key, value); rxHash_.insert(key, value);
if (!timer_->isActive())
timer_->start();
return true; return true;
} }
bool StreamTiming::recordTxTime(uint portId, uint guid, uint ttagId, bool StreamTiming::recordTxTime(uint portId, uint guid, uint ttagId,
struct timeval timestamp) const struct timeval &timestamp)
{ {
struct timespec ts; struct timespec ts;
ts.tv_sec = timestamp.tv_sec; ts.tv_sec = timestamp.tv_sec;
@ -72,7 +75,7 @@ bool StreamTiming::recordTxTime(uint portId, uint guid, uint ttagId,
} }
bool StreamTiming::recordRxTime(uint portId, uint guid, uint ttagId, bool StreamTiming::recordRxTime(uint portId, uint guid, uint ttagId,
struct timeval timestamp) const struct timeval &timestamp)
{ {
struct timespec ts; struct timespec ts;
ts.tv_sec = timestamp.tv_sec; ts.tv_sec = timestamp.tv_sec;
@ -81,6 +84,45 @@ bool StreamTiming::recordRxTime(uint portId, uint guid, uint ttagId,
return recordRxTime(portId, guid, ttagId, ts); return recordRxTime(portId, guid, ttagId, ts);
} }
quint64 StreamTiming::delay(uint portId, uint guid)
{
Q_ASSERT(guid <= SignProtocol::kMaxGuid);
QMutexLocker locker(&timingLock_);
if (!timing_.contains(portId))
return 0;
Timing t = timing_.value(portId)->value(guid);
if (t.countDelays == 0)
return 0;
qDebug("XXXX [%u/%u] %lldns", portId, guid,
timespecToNsecs(t.sumDelays)/t.countDelays);
return timespecToNsecs(t.sumDelays)/t.countDelays;
}
void StreamTiming::clear(uint portId, uint guid)
{
// XXX: We need to clear only the final timing hash; rx/tx hashes
// are cleared by StreamTiming itself as part of processRecords and
// deleteStaleRecords respectively
QMutexLocker locker(&timingLock_);
if (!timing_.contains(portId))
return;
PortTiming *portTiming = timing_.value(portId);
if (!portTiming)
return;
if (guid == SignProtocol::kInvalidGuid)
portTiming->clear(); // remove ALL guids
else
portTiming->remove(guid);
}
int StreamTiming::processRecords() int StreamTiming::processRecords()
{ {
// FIXME: yield after a certain count of records or time? // FIXME: yield after a certain count of records or time?
@ -97,11 +139,28 @@ int StreamTiming::processRecords()
struct timespec rxTime = i.value().timeStamp; struct timespec rxTime = i.value().timeStamp;
struct timespec diff; struct timespec diff;
timespecsub(&rxTime, &txTime, &diff); timespecsub(&rxTime, &txTime, &diff);
auto timingHash = timing_[i.value().portId];
auto guidTiming = (*timingHash)[i.key()]; uint guid = i.key() >> 8;
uint portId = i.value().portId;
if (!timing_.contains(portId))
timing_.insert(portId, new PortTiming);
PortTiming *portTiming = timing_.value(portId);
Timing &guidTiming = (*portTiming)[guid];
timespecadd(&guidTiming.sumDelays, &diff, &guidTiming.sumDelays); timespecadd(&guidTiming.sumDelays, &diff, &guidTiming.sumDelays);
guidTiming.countDelays++; guidTiming.countDelays++;
count++; count++;
qDebug("XXXXX [%u/%u] diff %ld.%ld (%ld.%ld - %ld.%ld)",
i.value().portId, i.key(),
diff.tv_sec, diff.tv_nsec,
rxTime.tv_sec, rxTime.tv_nsec,
txTime.tv_sec, txTime.tv_nsec);
qDebug("XXXXX %d:[%u/%u] total %ld.%ld count %u",
count, i.value().portId, i.key(),
guidTiming.sumDelays.tv_sec, guidTiming.sumDelays.tv_nsec,
guidTiming.countDelays);
} }
i = rxHash_.erase(i); i = rxHash_.erase(i);
} }
@ -136,11 +195,13 @@ int StreamTiming::deleteStaleRecords()
if (diff.tv_sec > 30) { if (diff.tv_sec > 30) {
i = txHash_.erase(i); i = txHash_.erase(i);
count++; count++;
qDebug("XXXX -%d", count);
} }
} }
// FIXME: when to stop gc timer? // FIXME: when to stop gc timer?
qDebug("XXXX garbage collected %d stale tx timing records", count);
return count; return count;
} }

View File

@ -20,6 +20,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#ifndef _STREAM_TIMING #ifndef _STREAM_TIMING
#define _STREAM_TIMING #define _STREAM_TIMING
#include "../common/sign.h"
#include <QHash> #include <QHash>
#include <QMutex> #include <QMutex>
#include <QTimer> #include <QTimer>
@ -33,14 +35,17 @@ public:
StreamTiming(); StreamTiming();
bool recordTxTime(uint portId, uint guid, uint ttagId, bool recordTxTime(uint portId, uint guid, uint ttagId,
struct timespec timestamp); const struct timespec &timestamp);
bool recordRxTime(uint portId, uint guid, uint ttagId, bool recordRxTime(uint portId, uint guid, uint ttagId,
struct timespec timestamp); const struct timespec &timestamp);
bool recordTxTime(uint portId, uint guid, uint ttagId, bool recordTxTime(uint portId, uint guid, uint ttagId,
struct timeval timestamp); const struct timeval &timestamp);
bool recordRxTime(uint portId, uint guid, uint ttagId, bool recordRxTime(uint portId, uint guid, uint ttagId,
struct timeval timestamp); const struct timeval &timestamp);
quint64 delay(uint portId, uint guid);
void clear(uint portId, uint guid = SignProtocol::kInvalidGuid);
static StreamTiming* instance(); static StreamTiming* instance();
@ -68,14 +73,18 @@ private:
}; };
// XXX: Key = guid (24 bit MSG), ttagid (8 bit LSB) // XXX: TxRxKey = guid (24 bit MSG) + ttagid (8 bit LSB)
// TODO: encode tx port in in packet and use as part of key // TODO: encode tx port in in packet and use as part of key
typedef quint32 Key; typedef quint32 TxRxKey;
QHash<Key, TtagData> txHash_; QHash<TxRxKey, TtagData> txHash_;
QHash<Key, TtagData> rxHash_; QHash<TxRxKey, TtagData> rxHash_;
QHash<uint, QHash<Key, Timing>*> timing_; // outer key => portId
QMutex txHashLock_; QMutex txHashLock_;
QMutex rxHashLock_; QMutex rxHashLock_;
typedef uint PortIdKey;
typedef uint GuidKey;
typedef QHash<GuidKey, Timing> PortTiming;
QHash<PortIdKey, PortTiming*> timing_;
QMutex timingLock_; // FIXME: change to RW lock? QMutex timingLock_; // FIXME: change to RW lock?
QTimer *timer_; // Periodic timer to process tx/rx records QTimer *timer_; // Periodic timer to process tx/rx records