Change algo to infer next Ttag pkt (interleaved mode)

The algo works for the following cases of interleaved streams -
 * pktListDuration < ttagTimeInterval
 * pktListDuration > ttagTimeInterval
 * some streams have Ttag, some don't
    - first stream has Ttag
    - first stream does NOT have Ttag
 * no streams have Ttag

Changes for sequential mode are pending
This commit is contained in:
Srivats P 2023-04-21 18:32:17 +05:30
parent d1d2a5c1b5
commit ef1c166e7f
8 changed files with 97 additions and 18 deletions

View File

@ -436,6 +436,8 @@ int AbstractPort::updatePacketListInterleaved()
int numStreams = 0; int numStreams = 0;
quint64 minGap = ULLONG_MAX; quint64 minGap = ULLONG_MAX;
quint64 duration = quint64(1e3); // 1000ns (1us) quint64 duration = quint64(1e3); // 1000ns (1us)
// TODO: convert the below to a QList of struct aggregating all list vars
QList<int> streamId; QList<int> streamId;
QList<quint64> ibg1, ibg2; QList<quint64> ibg1, ibg2;
QList<quint64> nb1, nb2; QList<quint64> nb1, nb2;
@ -445,6 +447,7 @@ int AbstractPort::updatePacketListInterleaved()
QList<ulong> pktCount, burstCount; QList<ulong> pktCount, burstCount;
QList<ulong> burstSize; QList<ulong> burstSize;
QList<bool> isVariable; QList<bool> isVariable;
QList<bool> hasTtag;
QList<QByteArray> pktBuf; QList<QByteArray> pktBuf;
QList<ulong> pktLen; QList<ulong> pktLen;
int activeStreamCount = 0; int activeStreamCount = 0;
@ -594,6 +597,8 @@ int AbstractPort::updatePacketListInterleaved()
packetListAttrib += attrib; packetListAttrib += attrib;
} }
hasTtag.append(streamList_[i]->hasProtocol(
OstProto::Protocol::kSignFieldNumber));
numStreams++; numStreams++;
} // for i } // for i
@ -629,6 +634,8 @@ int AbstractPort::updatePacketListInterleaved()
// FIXME: Turbo still thinks it has to create implicit packet set for // FIXME: Turbo still thinks it has to create implicit packet set for
// interleaved mode - Turbo code should be changed once this is validated // interleaved mode - Turbo code should be changed once this is validated
qint64 totalPkts = 0; qint64 totalPkts = 0;
QSet<int> ttagMarkerStreams;
QList<uint> ttagMarkers;
do do
{ {
for (int i = 0; i < numStreams; i++) for (int i = 0; i < numStreams; i++)
@ -637,6 +644,12 @@ int AbstractPort::updatePacketListInterleaved()
if ((schedSec.at(i) > sec) || (schedNsec.at(i) > nsec)) if ((schedSec.at(i) > sec) || (schedNsec.at(i) > nsec))
continue; continue;
// One marker per stream
if (hasTtag.at(i) && !ttagMarkerStreams.contains(i)) {
ttagMarkerStreams.insert(i);
ttagMarkers.append(totalPkts);
}
for (uint j = 0; j < burstSize[i]; j++) for (uint j = 0; j < burstSize[i]; j++)
{ {
pktCount[i]++; pktCount[i]++;
@ -680,8 +693,9 @@ int AbstractPort::updatePacketListInterleaved()
+ (durNsec - lastPktTxNsec)) + (durNsec - lastPktTxNsec))
/totalPkts; /totalPkts;
loopNextPacketSet(totalPkts, 1, 0, avgDelay); loopNextPacketSet(totalPkts, 1, 0, avgDelay);
qDebug("Interleaved Packet Set of size %lld, repeat 1 and delay %ldns", qDebug("Interleaved PacketSet of size %lld, duration %llu.%09llu "
totalPkts, avgDelay); "repeat 1 and avg delay %ldns",
totalPkts, durSec, durNsec, avgDelay);
// Reset working sched/counts before building the packet list // Reset working sched/counts before building the packet list
sec = nsec = 0; sec = nsec = 0;
@ -722,7 +736,7 @@ int AbstractPort::updatePacketListInterleaved()
if (len <= 0) if (len <= 0)
continue; continue;
qDebug("q(%d) sec = %llu nsec = %llu", i, sec, nsec); qDebug("q(%d) TS = %llu.%09llu", i, sec, nsec);
if (!appendToPacketList(sec, nsec, buf, len)) { if (!appendToPacketList(sec, nsec, buf, len)) {
clearPacketList(); // don't leave it half baked/inconsitent clearPacketList(); // don't leave it half baked/inconsitent
packetListAttrib.errorFlags |= FrameValueAttrib::OutOfMemoryError; packetListAttrib.errorFlags |= FrameValueAttrib::OutOfMemoryError;
@ -767,10 +781,22 @@ int AbstractPort::updatePacketListInterleaved()
delayNsec += long(1e9); delayNsec += long(1e9);
delaySec--; delaySec--;
} }
qDebug("loop Delay = %lld/%lld", delaySec, delayNsec); qDebug("loop Delay = %lld.%09lld", delaySec, delayNsec);
setPacketListLoopMode(true, delaySec, delayNsec); setPacketListLoopMode(true, delaySec, delayNsec);
} }
// XXX: TTag repeat interval calculation:
// CASE 1. pktListDuration < kTtagTimeInterval:
// e.g. if pktListDuration is 1sec and TtagTimerInterval is 5s, we
// skip 5 times total packets before we repeat the markers
// CASE 2. pktListDuration > kTtagTimeInterval:
// e.g. if pktListDuration is 7sec and TtagTimerInterval is 5s, we
// skip repeat markers every pktList iteration
setPacketListTtagMarkers(ttagMarkers, ttagMarkers.isEmpty() ? 0 :
qMax(uint(kTtagTimeInterval_*1e9
/(durSec*1e9+durNsec)),
1U) * totalPkts);
_out_of_memory: _out_of_memory:
isSendQueueDirty_ = false; isSendQueueDirty_ = false;

View File

@ -107,6 +107,8 @@ public:
int length) = 0; int length) = 0;
virtual void setPacketListLoopMode(bool loop, virtual void setPacketListLoopMode(bool loop,
quint64 secDelay, quint64 nsecDelay) = 0; quint64 secDelay, quint64 nsecDelay) = 0;
virtual void setPacketListTtagMarkers(QList<uint> markers,
uint repeatInterval) = 0;
int updatePacketList(); int updatePacketList();
virtual void startTransmit() = 0; virtual void startTransmit() = 0;
@ -163,6 +165,8 @@ protected:
StreamStats streamStats_; StreamStats streamStats_;
//! \todo Need lock for stats access/update //! \todo Need lock for stats access/update
const uint kTtagTimeInterval_{5}; // in seconds
struct InterfaceInfo *interfaceInfo_; struct InterfaceInfo *interfaceInfo_;
DeviceManager *deviceManager_; DeviceManager *deviceManager_;

View File

@ -39,7 +39,6 @@ public:
repeatSize_ = 1; repeatSize_ = 1;
usecDelay_ = 0; usecDelay_ = 0;
ttagL4CksumOffset_ = 0; ttagL4CksumOffset_ = 0;
ttagPktInterval_ = 0;
} }
~PacketSequence() { ~PacketSequence() {
pcap_sendqueue_destroy(sendQueue_); pcap_sendqueue_destroy(sendQueue_);
@ -85,7 +84,6 @@ public:
int repeatSize_; int repeatSize_;
long usecDelay_; long usecDelay_;
quint16 ttagL4CksumOffset_; // For ttag packets quint16 ttagL4CksumOffset_; // For ttag packets
qulonglong ttagPktInterval_; // ttag pkt once every X packets
StreamStats streamStatsMeta_; StreamStats streamStatsMeta_;
private: private:

View File

@ -48,6 +48,7 @@ public:
virtual void clearPacketList() { virtual void clearPacketList() {
transmitter_->clearPacketList(); transmitter_->clearPacketList();
setPacketListLoopMode(false, 0, 0); setPacketListLoopMode(false, 0, 0);
setPacketListTtagMarkers(QList<uint>(), 0);
} }
virtual void loopNextPacketSet(qint64 size, qint64 repeats, virtual void loopNextPacketSet(qint64 size, qint64 repeats,
long repeatDelaySec, long repeatDelayNsec) { long repeatDelaySec, long repeatDelayNsec) {
@ -62,6 +63,11 @@ public:
{ {
transmitter_->setPacketListLoopMode(loop, secDelay, nsecDelay); transmitter_->setPacketListLoopMode(loop, secDelay, nsecDelay);
} }
virtual void setPacketListTtagMarkers(QList<uint> markers,
uint repeatInterval)
{
transmitter_->setPacketListTtagMarkers(markers, repeatInterval);
}
virtual void startTransmit() { virtual void startTransmit() {
Q_ASSERT(!isDirty()); Q_ASSERT(!isDirty());

View File

@ -88,6 +88,13 @@ void PcapTransmitter::setPacketListLoopMode(
txThread_.setPacketListLoopMode(loop, secDelay, nsecDelay); txThread_.setPacketListLoopMode(loop, secDelay, nsecDelay);
} }
void PcapTransmitter::setPacketListTtagMarkers(
QList<uint> markers,
uint repeatInterval)
{
txThread_.setPacketListTtagMarkers(markers, repeatInterval);
}
void PcapTransmitter::useExternalStats(AbstractPort::PortStats *stats) void PcapTransmitter::useExternalStats(AbstractPort::PortStats *stats)
{ {
txStats_.useExternalStats(stats); txStats_.useExternalStats(stats);

View File

@ -42,6 +42,7 @@ public:
bool appendToPacketList(long sec, long usec, const uchar *packet, bool appendToPacketList(long sec, long usec, const uchar *packet,
int length); int length);
void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay); void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay);
void setPacketListTtagMarkers(QList<uint> markers, uint repeatInterval);
void setHandle(pcap_t *handle); void setHandle(pcap_t *handle);
void useExternalStats(AbstractPort::PortStats *stats); void useExternalStats(AbstractPort::PortStats *stats);

View File

@ -23,6 +23,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "statstuple.h" #include "statstuple.h"
#include "timestamp.h" #include "timestamp.h"
#include <QtDebug>
PcapTxThread::PcapTxThread(const char *device) PcapTxThread::PcapTxThread(const char *device)
{ {
char errbuf[PCAP_ERRBUF_SIZE] = ""; char errbuf[PCAP_ERRBUF_SIZE] = "";
@ -133,8 +135,6 @@ void PcapTxThread::loopNextPacketSet(qint64 size, qint64 repeats,
currentPacketSequence_->repeatCount_ = repeats; currentPacketSequence_->repeatCount_ = repeats;
currentPacketSequence_->usecDelay_ = repeatDelaySec * long(1e6) currentPacketSequence_->usecDelay_ = repeatDelaySec * long(1e6)
+ repeatDelayNsec/1000; + repeatDelayNsec/1000;
currentPacketSequence_->ttagPktInterval_ =
kTtagPktInterval*1e6/currentPacketSequence_->usecDelay_;
repeatSequenceStart_ = packetSequenceList_.size(); repeatSequenceStart_ = packetSequenceList_.size();
repeatSize_ = size; repeatSize_ = size;
@ -205,8 +205,6 @@ bool PcapTxThread::appendToPacketList(long sec, long nsec,
start->usecDelay_ = 0; start->usecDelay_ = 0;
start->repeatSize_ = start->repeatSize_ =
packetSequenceList_.size() - repeatSequenceStart_; packetSequenceList_.size() - repeatSequenceStart_;
start->ttagPktInterval_ = kTtagPktInterval*1e6
/currentPacketSequence_->usecDelay_;
} }
repeatSize_ = 0; repeatSize_ = 0;
@ -227,6 +225,27 @@ void PcapTxThread::setPacketListLoopMode(
loopDelay_ = secDelay*long(1e6) + nsecDelay/1000; loopDelay_ = secDelay*long(1e6) + nsecDelay/1000;
} }
void PcapTxThread::setPacketListTtagMarkers(
QList<uint> markers,
uint repeatInterval)
{
// XXX: Empty markers => no streams have Ttag
firstTtagPkt_ = markers.isEmpty() ? -1 : int(markers.first());
// Calculate delta markers
ttagDeltaMarkers_.clear();
for (int i = 1; i < markers.size(); i++)
ttagDeltaMarkers_.append(markers.at(i) - markers.at(i-1));
if (!markers.isEmpty()) {
ttagDeltaMarkers_.append(repeatInterval - markers.last()
+ markers.first());
qDebug() << "TtagRepeatInterval:" << repeatInterval;
qDebug() << "FirstTtagPkt:" << firstTtagPkt_;
qDebug() << "TtagMarkers:" << ttagDeltaMarkers_;
}
}
void PcapTxThread::setHandle(pcap_t *handle) void PcapTxThread::setHandle(pcap_t *handle)
{ {
if (usingInternalHandle_) if (usingInternalHandle_)
@ -282,15 +301,23 @@ void PcapTxThread::run()
packetSequenceList_.at(i)->repeatCount_, packetSequenceList_.at(i)->repeatCount_,
packetSequenceList_.at(i)->repeatSize_, packetSequenceList_.at(i)->repeatSize_,
packetSequenceList_.at(i)->usecDelay_); packetSequenceList_.at(i)->usecDelay_);
qDebug("sendQ[%d]: pkts = %ld, usecDuration = %ld, ttagL4CksumOfs = %hu ttagPktIntvl = %llu", i, qDebug("sendQ[%d]: pkts = %ld, usecDuration = %ld, ttagL4CksumOfs = %hu", i,
packetSequenceList_.at(i)->packets_, packetSequenceList_.at(i)->packets_,
packetSequenceList_.at(i)->usecDuration_, packetSequenceList_.at(i)->usecDuration_,
packetSequenceList_.at(i)->ttagL4CksumOffset_, packetSequenceList_.at(i)->ttagL4CksumOffset_);
packetSequenceList_.at(i)->ttagPktInterval_);
} }
qDebug() << "First Ttag: " << firstTtagPkt_
<< "Ttag Markers:" << ttagDeltaMarkers_;
lastStats_ = *stats_; // used for stream stats lastStats_ = *stats_; // used for stream stats
// Init Ttag related vars. If no packets need ttag, firstTtagPkt_ is -1,
// so nextTagPkt_ is set to practically unreachable value (due to
// 64 bit counter wraparound time!)
ttagMarkerIndex_ = 0;
nextTtagPkt_ = stats_->pkts + firstTtagPkt_;
getTimeStamp(&startTime); getTimeStamp(&startTime);
state_ = kRunning; state_ = kRunning;
i = 0; i = 0;
@ -416,6 +443,7 @@ double PcapTxThread::lastTxDuration()
return lastTxDuration_; return lastTxDuration_;
} }
// FIXME: do we need 'seq' as func param?
int PcapTxThread::sendQueueTransmit(pcap_t *p, PacketSequence *seq, int PcapTxThread::sendQueueTransmit(pcap_t *p, PacketSequence *seq,
long &overHead, int sync) long &overHead, int sync)
{ {
@ -434,9 +462,7 @@ int PcapTxThread::sendQueueTransmit(pcap_t *p, PacketSequence *seq,
quint16 origCksum = 0; quint16 origCksum = 0;
// Time for a T-Tag packet? // Time for a T-Tag packet?
// FIXME: optimize per packet costly modulo operation if (stats_->pkts == nextTtagPkt_) {
if (seq->ttagPktInterval_
&& ((stats_->pkts % seq->ttagPktInterval_) == 0)) {
ttagPkt = true; ttagPkt = true;
// XXX: write 2xBytes instead of 1xHalf-word to avoid // XXX: write 2xBytes instead of 1xHalf-word to avoid
// potential alignment problem // potential alignment problem
@ -467,6 +493,10 @@ int PcapTxThread::sendQueueTransmit(pcap_t *p, PacketSequence *seq,
*cksum = qToBigEndian(quint16(~newCksum)); *cksum = qToBigEndian(quint16(~newCksum));
} }
ttagId_++; ttagId_++;
nextTtagPkt_ += ttagDeltaMarkers_.at(ttagMarkerIndex_);
ttagMarkerIndex_++;
if (ttagMarkerIndex_ >= ttagDeltaMarkers_.size())
ttagMarkerIndex_ = 0;
} }
if (sync) { if (sync) {

View File

@ -42,6 +42,7 @@ public:
bool appendToPacketList(long sec, long usec, const uchar *packet, bool appendToPacketList(long sec, long usec, const uchar *packet,
int length); int length);
void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay); void setPacketListLoopMode(bool loop, quint64 secDelay, quint64 nsecDelay);
void setPacketListTtagMarkers(QList<uint> markers, uint repeatInterval);
void setHandle(pcap_t *handle); void setHandle(pcap_t *handle);
@ -80,7 +81,7 @@ private:
quint64 packetListSize_; // count of pkts in packet List including repeats quint64 packetListSize_; // count of pkts in packet List including repeats
int returnToQIdx_; int returnToQIdx_;
quint64 loopDelay_; quint64 loopDelay_; // in nanosecs
void (*udelayFn_)(unsigned long); void (*udelayFn_)(unsigned long);
@ -97,7 +98,13 @@ private:
double lastTxDuration_{0.0}; // in secs double lastTxDuration_{0.0}; // in secs
const ulong kTtagPktInterval{5}; // T-Tag pkt once every X sec // XXX: Ttag Marker config derived; not updated during Tx
int firstTtagPkt_;
QList<uint> ttagDeltaMarkers_;
// XXX: Ttag related; updated during Tx
int ttagMarkerIndex_;
quint64 nextTtagPkt_{0};
}; };
#endif #endif