Integrate StreamTiming with the code

Bugs found during integration were fixed and minor code improvements were made
such as using consts, const params, renaming members etc.
This commit is contained in:
Srivats P 2023-03-31 16:55:03 +05:30
parent fc2d8408fa
commit ab713ce043
12 changed files with 159 additions and 35 deletions

View File

@ -292,6 +292,7 @@ message StreamStats {
required StreamGuid stream_guid = 2;
optional double tx_duration = 3; // in seconds
optional uint64 delay = 4; // in nanoseconds
optional uint64 rx_pkts = 11;
optional uint64 rx_bytes = 12;

View File

@ -250,7 +250,7 @@ bool SignProtocol::packetTtagId(const uchar *pkt, int pktLen, uint *ttagId, uint
if (magic != kSignMagic)
return ret;
*guid = 0xffffffff; // invalid GUID
*guid = kInvalidGuid;
p--;
while (*p != kTypeLenEnd) {
if (*p == kTypeLenTtag) {

View File

@ -23,6 +23,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "abstractprotocol.h"
#include "sign.pb.h"
#include <limits.h>
/*
Sign Protocol is expected at the end of the frame (just before the Eth FCS)
---+--------+-------+
@ -89,6 +91,7 @@ public:
static bool packetTtagId(const uchar *pkt, int pktLen, uint *ttagId, uint *guid);
static const quint32 kMaxGuid = 0x00ffffff;
static const quint32 kInvalidGuid = UINT_MAX;
static const quint8 kTypeLenTtagPlaceholder = 0x22;
static const quint8 kTypeLenTtag = 0x23;
private:

View File

@ -25,6 +25,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "devicemanager.h"
#include "interfaceinfo.h"
#include "packetbuffer.h"
#include "streamtiming.h"
#include <QString>
#include <QIODevice>
@ -54,6 +55,8 @@ AbstractPort::AbstractPort(int id, const char *device)
maxStatsValue_ = ULLONG_MAX; // assume 64-bit stats
memset((void*) &stats_, 0, sizeof(stats_));
resetStats();
streamTiming_ = StreamTiming::instance();
}
AbstractPort::~AbstractPort()
@ -734,6 +737,16 @@ void AbstractPort::stats(PortStats *stats)
stats_.rxFrameErrors + (maxStatsValue_ - epochStats_.rxFrameErrors);
}
quint64 AbstractPort::streamTimingDelay(uint guid)
{
return streamTiming_->delay(id(), guid);
}
void AbstractPort::clearStreamTiming(uint guid)
{
streamTiming_->clear(id(), guid);
}
void AbstractPort::streamStats(uint guid, OstProto::StreamStatsList *stats)
{
// In case stats are being maintained elsewhere
@ -748,6 +761,7 @@ void AbstractPort::streamStats(uint guid, OstProto::StreamStatsList *stats)
s->mutable_port_id()->set_id(id());
s->set_tx_duration(lastTransmitDuration());
s->set_delay(streamTimingDelay(guid));
s->set_tx_pkts(sst.tx_pkts);
s->set_tx_bytes(sst.tx_bytes);
@ -775,6 +789,7 @@ void AbstractPort::streamStatsAll(OstProto::StreamStatsList *stats)
s->mutable_port_id()->set_id(id());
s->set_tx_duration(txDur);
s->set_delay(streamTimingDelay(i.key()));
s->set_tx_pkts(sst.tx_pkts);
s->set_tx_bytes(sst.tx_bytes);
@ -786,11 +801,13 @@ void AbstractPort::streamStatsAll(OstProto::StreamStatsList *stats)
void AbstractPort::resetStreamStats(uint guid)
{
streamStats_.remove(guid);
clearStreamTiming(guid);
}
void AbstractPort::resetStreamStatsAll()
{
streamStats_.clear();
clearStreamTiming();
}
void AbstractPort::clearDeviceNeighbors()

View File

@ -20,18 +20,20 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#ifndef _SERVER_ABSTRACT_PORT_H
#define _SERVER_ABSTRACT_PORT_H
#include "../common/protocol.pb.h"
#include "streamstats.h"
#include <QList>
#include <QtGlobal>
#include "../common/protocol.pb.h"
#include <limits.h>
class DeviceManager;
struct InterfaceInfo;
class StreamBase;
class PacketBuffer;
class QIODevice;
class StreamBase;
class StreamTiming;
// TODO: send notification back to client(s)
#define Xnotify qWarning
@ -120,6 +122,9 @@ public:
void stats(PortStats *stats);
void resetStats() { epochStats_ = stats_; }
quint64 streamTimingDelay(uint guid);
void clearStreamTiming(uint guid = UINT_MAX);
// FIXME: combine single and All calls?
void streamStats(uint guid, OstProto::StreamStatsList *stats);
void streamStatsAll(OstProto::StreamStatsList *stats);
@ -177,6 +182,8 @@ private:
QList<StreamBase*> streamList_;
struct PortStats epochStats_;
StreamTiming *streamTiming_{nullptr};
};
#endif

View File

@ -21,6 +21,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcapextra.h"
#include "../common/sign.h"
#include "streamtiming.h"
#define Xnotify qWarning // FIXME
@ -35,6 +36,8 @@ PcapRxStats::PcapRxStats(const char *device, StreamStats &portStreamStats, int i
handle_ = NULL;
id_ = id;
timing_ = StreamTiming::instance();
}
pcap_t* PcapRxStats::handle()
@ -117,8 +120,13 @@ _skip_filter:
ret = pcap_next_ex(handle_, &hdr, &data);
switch (ret) {
case 1: {
uint guid;
if (SignProtocol::packetGuid(data, hdr->caplen, &guid)) {
uint ttagId, guid;
if (SignProtocol::packetTtagId(data, hdr->caplen, &ttagId, &guid)) {
timing_->recordRxTime(id_, guid, ttagId, hdr->ts);
qDebug("XXXXX [%d RX] %ld:%ld ttag %u guid %u", id_,
hdr->ts.tv_sec, long(hdr->ts.tv_usec), ttagId, guid);
}
if (guid != SignProtocol::kInvalidGuid) {
streamStats_[guid].rx_pkts++;
streamStats_[guid].rx_bytes += hdr->caplen;
}

View File

@ -24,6 +24,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcapsession.h"
class StreamTiming;
class PcapRxStats: public PcapSession
{
public:
@ -48,7 +50,9 @@ private:
volatile State state_;
bool isDirectional_;
int id_;
int id_; // FIXME: rename to portId_
StreamTiming *timing_{nullptr};
};
#endif

View File

@ -24,6 +24,9 @@ QString PcapSession::debugStats()
{
QString dbgStats;
if (!handle_)
return QString();
#ifdef Q_OS_WIN32
static_assert(sizeof(struct pcap_stat) == 6*sizeof(uint),
"pcap_stat has less or more than 6 values");

View File

@ -21,14 +21,17 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcapextra.h"
#include "../common/sign.h"
#include "streamtiming.h"
#define Xnotify qWarning // FIXME
PcapTxTtagStats::PcapTxTtagStats(const char *device, int id)
: id_(id)
: portId_(id)
{
setObjectName(QString("TxTtagStats:%1").arg(device));
device_ = QString::fromLatin1(device);
timing_ = StreamTiming::instance();
}
void PcapTxTtagStats::run()
@ -37,9 +40,10 @@ void PcapTxTtagStats::run()
char errbuf[PCAP_ERRBUF_SIZE] = "";
struct bpf_program bpf;
const int optimize = 1;
QString capture_filter = QString("(ether[len - 5:5] == 0x%1%2)")
.arg(SignProtocol::kTypeLenTtag, 0, BASE_HEX)
.arg(SignProtocol::magic(), 0, BASE_HEX);
QString capture_filter = QString(
"(ether[len-4:4] == 0x%1) and (ether[len-5:1] == 0x%2")
.arg(SignProtocol::magic(), 0, BASE_HEX)
.arg(SignProtocol::kTypeLenTtag, 0, BASE_HEX);
qDebug("In %s", __PRETTY_FUNCTION__);
qDebug("pcap-filter: %s", qPrintable(capture_filter));
@ -102,8 +106,8 @@ _skip_filter:
uint guid;
if (SignProtocol::packetTtagId(data, hdr->caplen,
&ttagId, &guid)) {
// TODO: store packet timestamp with ttag/guid
qDebug("XXXXX [%d] %ld:%ld ttag %u guid %u", id_,
timing_->recordTxTime(portId_, guid, ttagId, hdr->ts);
qDebug("XXXXX [%d TX] %ld:%ld ttag %u guid %u", portId_,
hdr->ts.tv_sec, long(hdr->ts.tv_usec), ttagId, guid);
}
break;
@ -179,12 +183,15 @@ bool PcapTxTtagStats::isDirectional()
return isDirectional_;
}
// FIXME: move to PcapSession
// FIXME: moved to PcapSession, remove from here
// XXX: Implemented as reset on read
QString PcapTxTtagStats::debugStats()
{
QString dbgStats;
if (!handle_)
return QString();
#ifdef Q_OS_WIN32
static_assert(sizeof(struct pcap_stat) == 6*sizeof(uint),
"pcap_stat has less or more than 6 values");

View File

@ -22,6 +22,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "pcapsession.h"
class StreamTiming;
class PcapTxTtagStats: public PcapSession
{
public:
@ -48,8 +50,10 @@ private:
volatile State state_{kNotStarted};
volatile bool stop_{false};
int id_; // FIXME: needed?
int portId_;
struct pcap_stat lastPcapStats_;
StreamTiming *timing_{nullptr};
};
#endif

View File

@ -21,48 +21,51 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "timestamp.h"
#include <QCoreApplication>
StreamTiming::StreamTiming()
: QObject(nullptr) // FIXME: parent
{
// This class must be part of the main thread so that timers can work
Q_ASSERT(this->thread() == QCoreApplication::instance()->thread());
timer_ = new QTimer(this);
connect(timer_, &QTimer::timeout, this, &StreamTiming::processRecords);
timer_->setInterval(3000);
timer_->start();
gcTimer_ = new QTimer(this);
connect(gcTimer_, &QTimer::timeout, this, &StreamTiming::deleteStaleRecords);
gcTimer_->setInterval(30000);
//FIXME:gcTimer_->start();
}
bool StreamTiming::recordTxTime(uint portId, uint guid, uint ttagId,
struct timespec timestamp)
const struct timespec &timestamp)
{
Key key = makeKey(guid, ttagId);
TxRxKey key = makeKey(guid, ttagId);
TtagData value = { .timeStamp = timestamp, .portId = portId};
QMutexLocker locker(&txHashLock_);
txHash_.insert(key, value);
if (!timer_->isActive())
timer_->start();
return true;
}
bool StreamTiming::recordRxTime(uint portId, uint guid, uint ttagId,
struct timespec timestamp)
const struct timespec &timestamp)
{
Key key = makeKey(guid, ttagId);
TxRxKey key = makeKey(guid, ttagId);
TtagData value = { .timeStamp = timestamp, .portId = portId};
QMutexLocker locker(&rxHashLock_);
rxHash_.insert(key, value);
if (!timer_->isActive())
timer_->start();
return true;
}
bool StreamTiming::recordTxTime(uint portId, uint guid, uint ttagId,
struct timeval timestamp)
const struct timeval &timestamp)
{
struct timespec ts;
ts.tv_sec = timestamp.tv_sec;
@ -72,7 +75,7 @@ bool StreamTiming::recordTxTime(uint portId, uint guid, uint ttagId,
}
bool StreamTiming::recordRxTime(uint portId, uint guid, uint ttagId,
struct timeval timestamp)
const struct timeval &timestamp)
{
struct timespec ts;
ts.tv_sec = timestamp.tv_sec;
@ -81,6 +84,45 @@ bool StreamTiming::recordRxTime(uint portId, uint guid, uint ttagId,
return recordRxTime(portId, guid, ttagId, ts);
}
quint64 StreamTiming::delay(uint portId, uint guid)
{
Q_ASSERT(guid <= SignProtocol::kMaxGuid);
QMutexLocker locker(&timingLock_);
if (!timing_.contains(portId))
return 0;
Timing t = timing_.value(portId)->value(guid);
if (t.countDelays == 0)
return 0;
qDebug("XXXX [%u/%u] %lldns", portId, guid,
timespecToNsecs(t.sumDelays)/t.countDelays);
return timespecToNsecs(t.sumDelays)/t.countDelays;
}
void StreamTiming::clear(uint portId, uint guid)
{
// XXX: We need to clear only the final timing hash; rx/tx hashes
// are cleared by StreamTiming itself as part of processRecords and
// deleteStaleRecords respectively
QMutexLocker locker(&timingLock_);
if (!timing_.contains(portId))
return;
PortTiming *portTiming = timing_.value(portId);
if (!portTiming)
return;
if (guid == SignProtocol::kInvalidGuid)
portTiming->clear(); // remove ALL guids
else
portTiming->remove(guid);
}
int StreamTiming::processRecords()
{
// FIXME: yield after a certain count of records or time?
@ -97,11 +139,28 @@ int StreamTiming::processRecords()
struct timespec rxTime = i.value().timeStamp;
struct timespec diff;
timespecsub(&rxTime, &txTime, &diff);
auto timingHash = timing_[i.value().portId];
auto guidTiming = (*timingHash)[i.key()];
uint guid = i.key() >> 8;
uint portId = i.value().portId;
if (!timing_.contains(portId))
timing_.insert(portId, new PortTiming);
PortTiming *portTiming = timing_.value(portId);
Timing &guidTiming = (*portTiming)[guid];
timespecadd(&guidTiming.sumDelays, &diff, &guidTiming.sumDelays);
guidTiming.countDelays++;
count++;
qDebug("XXXXX [%u/%u] diff %ld.%ld (%ld.%ld - %ld.%ld)",
i.value().portId, i.key(),
diff.tv_sec, diff.tv_nsec,
rxTime.tv_sec, rxTime.tv_nsec,
txTime.tv_sec, txTime.tv_nsec);
qDebug("XXXXX %d:[%u/%u] total %ld.%ld count %u",
count, i.value().portId, i.key(),
guidTiming.sumDelays.tv_sec, guidTiming.sumDelays.tv_nsec,
guidTiming.countDelays);
}
i = rxHash_.erase(i);
}
@ -136,11 +195,13 @@ int StreamTiming::deleteStaleRecords()
if (diff.tv_sec > 30) {
i = txHash_.erase(i);
count++;
qDebug("XXXX -%d", count);
}
}
// FIXME: when to stop gc timer?
qDebug("XXXX garbage collected %d stale tx timing records", count);
return count;
}

View File

@ -20,6 +20,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#ifndef _STREAM_TIMING
#define _STREAM_TIMING
#include "../common/sign.h"
#include <QHash>
#include <QMutex>
#include <QTimer>
@ -33,14 +35,17 @@ public:
StreamTiming();
bool recordTxTime(uint portId, uint guid, uint ttagId,
struct timespec timestamp);
const struct timespec &timestamp);
bool recordRxTime(uint portId, uint guid, uint ttagId,
struct timespec timestamp);
const struct timespec &timestamp);
bool recordTxTime(uint portId, uint guid, uint ttagId,
struct timeval timestamp);
const struct timeval &timestamp);
bool recordRxTime(uint portId, uint guid, uint ttagId,
struct timeval timestamp);
const struct timeval &timestamp);
quint64 delay(uint portId, uint guid);
void clear(uint portId, uint guid = SignProtocol::kInvalidGuid);
static StreamTiming* instance();
@ -68,14 +73,18 @@ private:
};
// XXX: Key = guid (24 bit MSG), ttagid (8 bit LSB)
// XXX: TxRxKey = guid (24 bit MSG) + ttagid (8 bit LSB)
// TODO: encode tx port in in packet and use as part of key
typedef quint32 Key;
QHash<Key, TtagData> txHash_;
QHash<Key, TtagData> rxHash_;
QHash<uint, QHash<Key, Timing>*> timing_; // outer key => portId
typedef quint32 TxRxKey;
QHash<TxRxKey, TtagData> txHash_;
QHash<TxRxKey, TtagData> rxHash_;
QMutex txHashLock_;
QMutex rxHashLock_;
typedef uint PortIdKey;
typedef uint GuidKey;
typedef QHash<GuidKey, Timing> PortTiming;
QHash<PortIdKey, PortTiming*> timing_;
QMutex timingLock_; // FIXME: change to RW lock?
QTimer *timer_; // Periodic timer to process tx/rx records